diff --git a/dab_mechanic/wikidata_oauth.py b/dab_mechanic/wikidata_oauth.py index 5b048b2..a872da1 100644 --- a/dab_mechanic/wikidata_oauth.py +++ b/dab_mechanic/wikidata_oauth.py @@ -1,6 +1,10 @@ +"""Wikidata OAuth.""" + +import typing from urllib.parse import urlencode from flask import current_app, session +from requests.models import Response from requests_oauthlib import OAuth1Session WIKI_HOSTNAME = "en.wikipedia.org" @@ -8,6 +12,8 @@ API_URL = f"https://{WIKI_HOSTNAME}/w/api.php" TIMEOUT = 20 +CallParams = dict[str, str | int] + def get_edit_proxy() -> dict[str, str]: """Retrieve proxy information from config.""" @@ -18,7 +24,7 @@ def get_edit_proxy() -> dict[str, str]: return {} -def api_post_request(params: dict[str, str | int]): +def api_post_request(params: CallParams) -> Response: """HTTP Post using Oauth.""" app = current_app client_key = app.config["CLIENT_KEY"] @@ -30,10 +36,12 @@ def api_post_request(params: dict[str, str | int]): resource_owner_secret=session["owner_secret"], ) proxies = get_edit_proxy() - return oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies) + r: Response = oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies) + return r -def raw_request(params): +def raw_request(params: CallParams) -> Response: + """Raw request.""" app = current_app url = API_URL + "?" + urlencode(params) client_key = app.config["CLIENT_KEY"] @@ -45,42 +53,49 @@ def raw_request(params): resource_owner_secret=session["owner_secret"], ) proxies = get_edit_proxy() - return oauth.get(url, timeout=TIMEOUT, proxies=proxies) + r: Response = oauth.get(url, timeout=TIMEOUT, proxies=proxies) + return r -def api_request(params): - return raw_request(params).json() +def api_request(params: CallParams) -> dict[str, typing.Any]: + """Make API request and return object parsed from JSON.""" + return typing.cast(dict[str, typing.Any], raw_request(params).json()) -def get_token(): - params = { +def get_token() -> str: + """Get csrftoken from MediaWiki API.""" + params: CallParams = { "action": "query", "meta": "tokens", "format": "json", "formatversion": 2, } reply = api_request(params) - token = reply["query"]["tokens"]["csrftoken"] + token: str = reply["query"]["tokens"]["csrftoken"] return token -def userinfo_call(): +def userinfo_call() -> dict[str, typing.Any]: """Request user information via OAuth.""" - params = {"action": "query", "meta": "userinfo", "format": "json"} + params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"} return api_request(params) -def get_username(): +def get_username() -> str | None: + """Get username for current user.""" if "owner_key" not in session: - return # not authorized + return None # not authorized if "username" in session: + assert isinstance(session["username"], str) return session["username"] reply = userinfo_call() if "query" not in reply: - return - session["username"] = reply["query"]["userinfo"]["name"] + return None + username = reply["query"]["userinfo"]["name"] + assert isinstance(username, str) + session["username"] = username - return session["username"] + return username diff --git a/templates/show_error.html b/templates/show_error.html index e75e75e..dd79c3c 100644 --- a/templates/show_error.html +++ b/templates/show_error.html @@ -7,24 +7,17 @@ {% block content %}
-

Software error: {{ tb.exception_type }}

+

Software error: {{ exception_type }}

-
{{ tb.exception }}
+
{{ exception }}
-{% set body %} -URL: {{ request.url }} - -{{ tb.plaintext | safe }} -{% endset %} - -

Submit as an issue on GitHub (requires an account with GitHub)

-

Traceback (most recent call last)

-{{ tb.render_summary(include_title=False) | safe }} +{{ summary | safe }} + +

Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}

+
{{ last_frame.f_locals | pprint }}
-

Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}

-
{{ last_frame.locals | pprint }}
{% endblock %} diff --git a/web_view.py b/web_view.py index aa962bc..0df40cb 100755 --- a/web_view.py +++ b/web_view.py @@ -3,16 +3,17 @@ import inspect import json import re +import sys +import traceback from typing import Optional, TypedDict -import mwparserfromhell -from pprint import pprint import flask import lxml.html +import mwparserfromhell import requests import werkzeug.exceptions from requests_oauthlib import OAuth1Session -from werkzeug.debug.tbtools import get_current_traceback +from werkzeug.debug.tbtools import DebugTraceback from werkzeug.wrappers import Response from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia @@ -28,20 +29,31 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php" @app.before_request -def global_user(): +def global_user() -> None: """Make username available everywhere.""" flask.g.user = wikidata_oauth.get_username() @app.errorhandler(werkzeug.exceptions.InternalServerError) -def exception_handler(e): - tb = get_current_traceback() - last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library) - last_frame_args = inspect.getargs(last_frame.code) +def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]: + """Handle exception.""" + exec_type, exc_value, current_traceback = sys.exc_info() + assert exc_value + tb = DebugTraceback(exc_value) + + summary = tb.render_traceback_html(include_title=False) + exc_lines = "".join(tb._te.format_exception_only()) + + last_frame = list(traceback.walk_tb(current_traceback))[-1][0] + last_frame_args = inspect.getargs(last_frame.f_code) + return ( flask.render_template( "show_error.html", - tb=tb, + plaintext=tb.render_traceback_text(), + exception=exc_lines, + exception_type=tb._te.exc_type.__name__, + summary=summary, last_frame=last_frame, last_frame_args=last_frame_args, ), @@ -49,14 +61,15 @@ def exception_handler(e): ) -def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]: +def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]: """Parse Articles With Multiple Dablinks.""" articles = [] table = root.find(".//table") + assert table is not None for tr in table: title = tr[0][0].text count_text = tr[1][0].text - assert count_text.endswith(" links") + assert title and count_text and count_text.endswith(" links") count = int(count_text[:-6]) articles.append((title, count)) @@ -65,7 +78,8 @@ def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, in @app.route("/") -def index(): +def index() -> str | Response: + """Index page.""" title = flask.request.args.get("title") exists = None if title: @@ -83,7 +97,10 @@ def index(): # articles = [line[:-1] for line in open("article_list")] return flask.render_template( - "index.html", title=title, exists=exists, articles=articles, + "index.html", + title=title, + exists=exists, + articles=articles, ) @@ -215,17 +232,17 @@ def save(enwiki: str) -> Response | str: def redirect_if_needed(enwiki: str) -> Optional[Response]: """Check if there are spaces in the article name and redirect.""" + endpoint = flask.request.endpoint + assert endpoint return ( - flask.redirect( - flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_")) - ) + flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_"))) if " " in enwiki else None ) @app.route("/enwiki/") -def article_page(enwiki: str) -> Response: +def article_page(enwiki: str) -> Response | str: """Article Page.""" redirect = redirect_if_needed(enwiki) if redirect: @@ -244,7 +261,8 @@ def article_page(enwiki: str) -> Response: @app.route("/oauth/start") -def start_oauth(): +def start_oauth() -> Response: + """Start OAuth.""" next_page = flask.request.args.get("next") if next_page: flask.session["after_login"] = next_page @@ -267,7 +285,8 @@ def start_oauth(): @app.route("/oauth/callback", methods=["GET"]) -def oauth_callback(): +def oauth_callback() -> Response: + """Autentication callback.""" client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] @@ -294,11 +313,12 @@ def oauth_callback(): flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret") next_page = flask.session.get("after_login") - return flask.redirect(next_page) if next_page else flask.url_for("index") + return flask.redirect(next_page if next_page else flask.url_for("index")) @app.route("/oauth/disconnect") -def oauth_disconnect(): +def oauth_disconnect() -> Response: + """Disconnect OAuth.""" for key in "owner_key", "owner_secret", "username", "after_login": if key in flask.session: del flask.session[key]