Compare commits
2 commits
7aa1c33c4b
...
0ea62afd1f
Author | SHA1 | Date | |
---|---|---|---|
Edward Betts | 0ea62afd1f | ||
Edward Betts | 89b111d40c |
|
@ -5,6 +5,7 @@ import typing
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
|
import itsdangerous
|
||||||
import werkzeug
|
import werkzeug
|
||||||
from itsdangerous.url_safe import URLSafeTimedSerializer
|
from itsdangerous.url_safe import URLSafeTimedSerializer
|
||||||
|
|
||||||
|
@ -29,7 +30,7 @@ def verify_secure_token(token: str, salt: str, max_age: int) -> str | None:
|
||||||
serializer = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"])
|
serializer = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"])
|
||||||
try:
|
try:
|
||||||
data = serializer.loads(token, salt=salt, max_age=max_age)
|
data = serializer.loads(token, salt=salt, max_age=max_age)
|
||||||
except Exception:
|
except itsdangerous.exc.BadTimeSignature:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
assert isinstance(data, str)
|
assert isinstance(data, str)
|
||||||
|
@ -53,6 +54,9 @@ def require_authentication() -> werkzeug.Response | None:
|
||||||
if not flask.current_app.config.get("REQUIRE_AUTH"):
|
if not flask.current_app.config.get("REQUIRE_AUTH"):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
if flask.request.endpoint == "auth_callback":
|
||||||
|
return None
|
||||||
|
|
||||||
token = flask.request.cookies.get("auth_token")
|
token = flask.request.cookies.get("auth_token")
|
||||||
if token and verify_auth_token(token):
|
if token and verify_auth_token(token):
|
||||||
return None
|
return None
|
||||||
|
@ -77,9 +81,11 @@ def auth_callback() -> tuple[str, int] | werkzeug.Response:
|
||||||
] # The original token passed to UniAuth
|
] # The original token passed to UniAuth
|
||||||
expire_date = datetime.now() + timedelta(days=180)
|
expire_date = datetime.now() + timedelta(days=180)
|
||||||
|
|
||||||
original_url = verify_secure_token(token, salt="secure-redirect", max_age=600)
|
token_payload = verify_secure_token(token, salt="secure-redirect", max_age=600)
|
||||||
if not original_url:
|
if not token_payload:
|
||||||
return "Invalid or expired token", 400
|
return "Invalid or expired token", 400
|
||||||
|
original_url = json.loads(token_payload)["original_url"]
|
||||||
|
|
||||||
# Proceed with setting the auth_token cookie and redirecting to the original_url
|
# Proceed with setting the auth_token cookie and redirecting to the original_url
|
||||||
# This is where you set the auth_token received from UniAuth in the client's cookies
|
# This is where you set the auth_token received from UniAuth in the client's cookies
|
||||||
response = flask.make_response(flask.redirect(original_url))
|
response = flask.make_response(flask.redirect(original_url))
|
||||||
|
|
59
main.py
59
main.py
|
@ -13,7 +13,7 @@ import werkzeug
|
||||||
import UniAuth.auth
|
import UniAuth.auth
|
||||||
|
|
||||||
app = flask.Flask(__name__)
|
app = flask.Flask(__name__)
|
||||||
app.debug = True
|
app.debug = False
|
||||||
app.config.from_object("config.default")
|
app.config.from_object("config.default")
|
||||||
|
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ def login_required(f: Callable[..., Any]) -> Callable[..., Any]:
|
||||||
|
|
||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
def decorated_function(*args, **kwargs) -> werkzeug.Response: # type: ignore
|
def decorated_function(*args, **kwargs) -> werkzeug.Response: # type: ignore
|
||||||
token = flask.request.cookies.get("auth_token")
|
token = flask.request.cookies.get("uniauth_token")
|
||||||
if not token or UniAuth.auth.verify_auth_token(token) is None:
|
if not token or UniAuth.auth.verify_auth_token(token) is None:
|
||||||
# Save the original URL in the session and redirect to login
|
# Save the original URL in the session and redirect to login
|
||||||
flask.session["next"] = flask.request.url
|
flask.session["next"] = flask.request.url
|
||||||
|
@ -55,40 +55,67 @@ def check_user_auth() -> dict[str, Any] | None:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/login", methods=["GET", "POST"])
|
def read_callback_url_from_token() -> str | None:
|
||||||
def login_page() -> str | werkzeug.Response | tuple[str, int]:
|
"""Parse token and extract callback URL."""
|
||||||
"""Login page."""
|
|
||||||
if flask.request.method == "GET":
|
|
||||||
return flask.render_template("login.html")
|
|
||||||
|
|
||||||
if not (user := check_user_auth()):
|
|
||||||
# Login failed: Show an error message on the login page
|
|
||||||
return flask.render_template("login.html", error="Invalid credentials")
|
|
||||||
|
|
||||||
token = flask.request.args.get("token")
|
token = flask.request.args.get("token")
|
||||||
if not token:
|
if not token:
|
||||||
return "Invalid token", 400
|
return None
|
||||||
|
|
||||||
json_data = UniAuth.auth.verify_secure_token(
|
json_data = UniAuth.auth.verify_secure_token(
|
||||||
token, salt="secure-redirect", max_age=600
|
token, salt="secure-redirect", max_age=600
|
||||||
)
|
)
|
||||||
|
if not json_data:
|
||||||
|
return None
|
||||||
assert isinstance(json_data, str)
|
assert isinstance(json_data, str)
|
||||||
token_payload = json.loads(json_data)
|
token_payload = json.loads(json_data)
|
||||||
if not token_payload:
|
|
||||||
return "Invalid token", 400
|
|
||||||
|
|
||||||
callback_url = token_payload["callback_url"]
|
callback_url = token_payload["callback_url"]
|
||||||
|
assert isinstance(callback_url, str)
|
||||||
|
|
||||||
|
return callback_url
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/login", methods=["GET", "POST"])
|
||||||
|
def login_page() -> str | werkzeug.Response | tuple[str, int]:
|
||||||
|
"""Login page."""
|
||||||
|
app.logger.info("Login page.")
|
||||||
|
if flask.request.method == "GET":
|
||||||
|
uniauth_token = flask.request.cookies.get("uniauth_token")
|
||||||
|
if (
|
||||||
|
not uniauth_token
|
||||||
|
or not UniAuth.auth.verify_auth_token(uniauth_token)
|
||||||
|
or not (callback_url := read_callback_url_from_token())
|
||||||
|
):
|
||||||
|
return flask.render_template("login.html")
|
||||||
|
token = flask.request.args["token"]
|
||||||
|
redirect_to_callback = f"{callback_url}?auth_token={uniauth_token}&next={token}"
|
||||||
|
app.logger.info(f"Redirecting to: {redirect_to_callback}")
|
||||||
|
|
||||||
|
return flask.redirect(redirect_to_callback)
|
||||||
|
|
||||||
|
if not (user := check_user_auth()):
|
||||||
|
# Login failed: Show an error message on the login page
|
||||||
|
app.logger.info("User auth failed")
|
||||||
|
return flask.render_template("login.html", error="Invalid credentials")
|
||||||
|
|
||||||
|
callback_url = read_callback_url_from_token()
|
||||||
|
if not callback_url:
|
||||||
|
return "Invalid token", 400
|
||||||
|
|
||||||
auth_token = UniAuth.auth.generate_auth_token(user["username"])
|
auth_token = UniAuth.auth.generate_auth_token(user["username"])
|
||||||
|
token = flask.request.args["token"]
|
||||||
|
|
||||||
redirect_to_callback = f"{callback_url}?auth_token={auth_token}&next={token}"
|
redirect_to_callback = f"{callback_url}?auth_token={auth_token}&next={token}"
|
||||||
|
|
||||||
|
app.logger.info(f"Redirecting to: {redirect_to_callback}")
|
||||||
|
|
||||||
# flask.flash("Welcome back! You have successfully logged in.")
|
# flask.flash("Welcome back! You have successfully logged in.")
|
||||||
|
|
||||||
expire_date = datetime.now() + timedelta(days=180)
|
expire_date = datetime.now() + timedelta(days=180)
|
||||||
|
|
||||||
response = flask.redirect(redirect_to_callback)
|
response = flask.redirect(redirect_to_callback)
|
||||||
response.set_cookie(
|
response.set_cookie(
|
||||||
"auth_token",
|
"uniauth_token",
|
||||||
token,
|
token,
|
||||||
expires=expire_date,
|
expires=expire_date,
|
||||||
httponly=True,
|
httponly=True,
|
||||||
|
|
Loading…
Reference in a new issue