Compare commits

..

3 commits

3 changed files with 78 additions and 50 deletions

View file

@ -1,6 +1,10 @@
"""Wikidata OAuth."""
import typing
from urllib.parse import urlencode from urllib.parse import urlencode
from flask import current_app, session from flask import current_app, session
from requests.models import Response
from requests_oauthlib import OAuth1Session from requests_oauthlib import OAuth1Session
WIKI_HOSTNAME = "en.wikipedia.org" WIKI_HOSTNAME = "en.wikipedia.org"
@ -8,6 +12,8 @@ API_URL = f"https://{WIKI_HOSTNAME}/w/api.php"
TIMEOUT = 20 TIMEOUT = 20
CallParams = dict[str, str | int]
def get_edit_proxy() -> dict[str, str]: def get_edit_proxy() -> dict[str, str]:
"""Retrieve proxy information from config.""" """Retrieve proxy information from config."""
@ -18,7 +24,7 @@ def get_edit_proxy() -> dict[str, str]:
return {} return {}
def api_post_request(params: dict[str, str | int]): def api_post_request(params: CallParams) -> Response:
"""HTTP Post using Oauth.""" """HTTP Post using Oauth."""
app = current_app app = current_app
client_key = app.config["CLIENT_KEY"] client_key = app.config["CLIENT_KEY"]
@ -30,10 +36,12 @@ def api_post_request(params: dict[str, str | int]):
resource_owner_secret=session["owner_secret"], resource_owner_secret=session["owner_secret"],
) )
proxies = get_edit_proxy() proxies = get_edit_proxy()
return oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies) r: Response = oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies)
return r
def raw_request(params): def raw_request(params: CallParams) -> Response:
"""Raw request."""
app = current_app app = current_app
url = API_URL + "?" + urlencode(params) url = API_URL + "?" + urlencode(params)
client_key = app.config["CLIENT_KEY"] client_key = app.config["CLIENT_KEY"]
@ -45,42 +53,49 @@ def raw_request(params):
resource_owner_secret=session["owner_secret"], resource_owner_secret=session["owner_secret"],
) )
proxies = get_edit_proxy() proxies = get_edit_proxy()
return oauth.get(url, timeout=TIMEOUT, proxies=proxies) r: Response = oauth.get(url, timeout=TIMEOUT, proxies=proxies)
return r
def api_request(params): def api_request(params: CallParams) -> dict[str, typing.Any]:
return raw_request(params).json() """Make API request and return object parsed from JSON."""
return typing.cast(dict[str, typing.Any], raw_request(params).json())
def get_token(): def get_token() -> str:
params = { """Get csrftoken from MediaWiki API."""
params: CallParams = {
"action": "query", "action": "query",
"meta": "tokens", "meta": "tokens",
"format": "json", "format": "json",
"formatversion": 2, "formatversion": 2,
} }
reply = api_request(params) reply = api_request(params)
token = reply["query"]["tokens"]["csrftoken"] token: str = reply["query"]["tokens"]["csrftoken"]
return token return token
def userinfo_call(): def userinfo_call() -> dict[str, typing.Any]:
"""Request user information via OAuth.""" """Request user information via OAuth."""
params = {"action": "query", "meta": "userinfo", "format": "json"} params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
return api_request(params) return api_request(params)
def get_username(): def get_username() -> str | None:
"""Get username for current user."""
if "owner_key" not in session: if "owner_key" not in session:
return # not authorized return None # not authorized
if "username" in session: if "username" in session:
assert isinstance(session["username"], str)
return session["username"] return session["username"]
reply = userinfo_call() reply = userinfo_call()
if "query" not in reply: if "query" not in reply:
return return None
session["username"] = reply["query"]["userinfo"]["name"] username = reply["query"]["userinfo"]["name"]
assert isinstance(username, str)
session["username"] = username
return session["username"] return username

View file

@ -7,24 +7,17 @@
{% block content %} {% block content %}
<div class="p-2"> <div class="p-2">
<h1>Software error: {{ tb.exception_type }}</h1> <h1>Software error: {{ exception_type }}</h1>
<div> <div>
<pre>{{ tb.exception }}</pre> <pre>{{ exception }}</pre>
</div> </div>
{% set body %}
URL: {{ request.url }}
{{ tb.plaintext | safe }}
{% endset %}
<p><a class="btn btn-primary btn-lg" role="button" href="https://github.com/EdwardBetts/dab-mechanic/issues/new?title={{ tb.exception + " " + request.url | urlencode }}&body={{ body | urlencode }}">Submit as an issue on GitHub</a> (requires an account with GitHub)</p>
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2> <h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
{{ tb.render_summary(include_title=False) | safe }} {{ summary | safe }}
<p>Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}</p>
<pre>{{ last_frame.f_locals | pprint }}</pre>
<p>Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}</p>
<pre>{{ last_frame.locals | pprint }}</pre>
</div> </div>
{% endblock %} {% endblock %}

View file

@ -3,16 +3,17 @@
import inspect import inspect
import json import json
import re import re
import sys
import traceback
from typing import Optional, TypedDict from typing import Optional, TypedDict
import mwparserfromhell
from pprint import pprint
import flask import flask
import lxml.html import lxml.html
import mwparserfromhell
import requests import requests
import werkzeug.exceptions import werkzeug.exceptions
from requests_oauthlib import OAuth1Session from requests_oauthlib import OAuth1Session
from werkzeug.debug.tbtools import get_current_traceback from werkzeug.debug.tbtools import DebugTraceback
from werkzeug.wrappers import Response from werkzeug.wrappers import Response
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
@ -28,20 +29,31 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
@app.before_request @app.before_request
def global_user(): def global_user() -> None:
"""Make username available everywhere.""" """Make username available everywhere."""
flask.g.user = wikidata_oauth.get_username() flask.g.user = wikidata_oauth.get_username()
@app.errorhandler(werkzeug.exceptions.InternalServerError) @app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e): def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
tb = get_current_traceback() """Handle exception."""
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library) exec_type, exc_value, current_traceback = sys.exc_info()
last_frame_args = inspect.getargs(last_frame.code) assert exc_value
tb = DebugTraceback(exc_value)
summary = tb.render_traceback_html(include_title=False)
exc_lines = "".join(tb._te.format_exception_only())
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
last_frame_args = inspect.getargs(last_frame.f_code)
return ( return (
flask.render_template( flask.render_template(
"show_error.html", "show_error.html",
tb=tb, plaintext=tb.render_traceback_text(),
exception=exc_lines,
exception_type=tb._te.exc_type.__name__,
summary=summary,
last_frame=last_frame, last_frame=last_frame,
last_frame_args=last_frame_args, last_frame_args=last_frame_args,
), ),
@ -49,14 +61,15 @@ def exception_handler(e):
) )
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]: def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
"""Parse Articles With Multiple Dablinks.""" """Parse Articles With Multiple Dablinks."""
articles = [] articles = []
table = root.find(".//table") table = root.find(".//table")
assert table is not None
for tr in table: for tr in table:
title = tr[0][0].text title = tr[0][0].text
count_text = tr[1][0].text count_text = tr[1][0].text
assert count_text.endswith(" links") assert title and count_text and count_text.endswith(" links")
count = int(count_text[:-6]) count = int(count_text[:-6])
articles.append((title, count)) articles.append((title, count))
@ -65,7 +78,8 @@ def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, in
@app.route("/") @app.route("/")
def index(): def index() -> str | Response:
"""Index page."""
title = flask.request.args.get("title") title = flask.request.args.get("title")
exists = None exists = None
if title: if title:
@ -83,7 +97,10 @@ def index():
# articles = [line[:-1] for line in open("article_list")] # articles = [line[:-1] for line in open("article_list")]
return flask.render_template( return flask.render_template(
"index.html", title=title, exists=exists, articles=articles, "index.html",
title=title,
exists=exists,
articles=articles,
) )
@ -215,17 +232,17 @@ def save(enwiki: str) -> Response | str:
def redirect_if_needed(enwiki: str) -> Optional[Response]: def redirect_if_needed(enwiki: str) -> Optional[Response]:
"""Check if there are spaces in the article name and redirect.""" """Check if there are spaces in the article name and redirect."""
endpoint = flask.request.endpoint
assert endpoint
return ( return (
flask.redirect( flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
)
if " " in enwiki if " " in enwiki
else None else None
) )
@app.route("/enwiki/<path:enwiki>") @app.route("/enwiki/<path:enwiki>")
def article_page(enwiki: str) -> Response: def article_page(enwiki: str) -> Response | str:
"""Article Page.""" """Article Page."""
redirect = redirect_if_needed(enwiki) redirect = redirect_if_needed(enwiki)
if redirect: if redirect:
@ -244,7 +261,8 @@ def article_page(enwiki: str) -> Response:
@app.route("/oauth/start") @app.route("/oauth/start")
def start_oauth(): def start_oauth() -> Response:
"""Start OAuth."""
next_page = flask.request.args.get("next") next_page = flask.request.args.get("next")
if next_page: if next_page:
flask.session["after_login"] = next_page flask.session["after_login"] = next_page
@ -267,7 +285,8 @@ def start_oauth():
@app.route("/oauth/callback", methods=["GET"]) @app.route("/oauth/callback", methods=["GET"])
def oauth_callback(): def oauth_callback() -> Response:
"""Autentication callback."""
client_key = app.config["CLIENT_KEY"] client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"] client_secret = app.config["CLIENT_SECRET"]
@ -294,11 +313,12 @@ def oauth_callback():
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret") flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
next_page = flask.session.get("after_login") next_page = flask.session.get("after_login")
return flask.redirect(next_page) if next_page else flask.url_for("index") return flask.redirect(next_page if next_page else flask.url_for("index"))
@app.route("/oauth/disconnect") @app.route("/oauth/disconnect")
def oauth_disconnect(): def oauth_disconnect() -> Response:
"""Disconnect OAuth."""
for key in "owner_key", "owner_secret", "username", "after_login": for key in "owner_key", "owner_secret", "username", "after_login":
if key in flask.session: if key in flask.session:
del flask.session[key] del flask.session[key]