Compare commits

..

No commits in common. "edfa0f8b0c26e9910c9827510835b01999af2f0e" and "06cf2e9f14b67f5a2ec888ca8414513db7ec7848" have entirely different histories.

3 changed files with 50 additions and 78 deletions

View file

@ -1,10 +1,6 @@
"""Wikidata OAuth."""
import typing
from urllib.parse import urlencode
from flask import current_app, session
from requests.models import Response
from requests_oauthlib import OAuth1Session
WIKI_HOSTNAME = "en.wikipedia.org"
@ -12,8 +8,6 @@ API_URL = f"https://{WIKI_HOSTNAME}/w/api.php"
TIMEOUT = 20
CallParams = dict[str, str | int]
def get_edit_proxy() -> dict[str, str]:
"""Retrieve proxy information from config."""
@ -24,7 +18,7 @@ def get_edit_proxy() -> dict[str, str]:
return {}
def api_post_request(params: CallParams) -> Response:
def api_post_request(params: dict[str, str | int]):
"""HTTP Post using Oauth."""
app = current_app
client_key = app.config["CLIENT_KEY"]
@ -36,12 +30,10 @@ def api_post_request(params: CallParams) -> Response:
resource_owner_secret=session["owner_secret"],
)
proxies = get_edit_proxy()
r: Response = oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies)
return r
return oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies)
def raw_request(params: CallParams) -> Response:
"""Raw request."""
def raw_request(params):
app = current_app
url = API_URL + "?" + urlencode(params)
client_key = app.config["CLIENT_KEY"]
@ -53,49 +45,42 @@ def raw_request(params: CallParams) -> Response:
resource_owner_secret=session["owner_secret"],
)
proxies = get_edit_proxy()
r: Response = oauth.get(url, timeout=TIMEOUT, proxies=proxies)
return r
return oauth.get(url, timeout=TIMEOUT, proxies=proxies)
def api_request(params: CallParams) -> dict[str, typing.Any]:
"""Make API request and return object parsed from JSON."""
return typing.cast(dict[str, typing.Any], raw_request(params).json())
def api_request(params):
return raw_request(params).json()
def get_token() -> str:
"""Get csrftoken from MediaWiki API."""
params: CallParams = {
def get_token():
params = {
"action": "query",
"meta": "tokens",
"format": "json",
"formatversion": 2,
}
reply = api_request(params)
token: str = reply["query"]["tokens"]["csrftoken"]
token = reply["query"]["tokens"]["csrftoken"]
return token
def userinfo_call() -> dict[str, typing.Any]:
def userinfo_call():
"""Request user information via OAuth."""
params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
params = {"action": "query", "meta": "userinfo", "format": "json"}
return api_request(params)
def get_username() -> str | None:
"""Get username for current user."""
def get_username():
if "owner_key" not in session:
return None # not authorized
return # not authorized
if "username" in session:
assert isinstance(session["username"], str)
return session["username"]
reply = userinfo_call()
if "query" not in reply:
return None
username = reply["query"]["userinfo"]["name"]
assert isinstance(username, str)
session["username"] = username
return
session["username"] = reply["query"]["userinfo"]["name"]
return username
return session["username"]

View file

@ -7,17 +7,24 @@
{% block content %}
<div class="p-2">
<h1>Software error: {{ exception_type }}</h1>
<h1>Software error: {{ tb.exception_type }}</h1>
<div>
<pre>{{ exception }}</pre>
<pre>{{ tb.exception }}</pre>
</div>
{% set body %}
URL: {{ request.url }}
{{ tb.plaintext | safe }}
{% endset %}
<p><a class="btn btn-primary btn-lg" role="button" href="https://github.com/EdwardBetts/dab-mechanic/issues/new?title={{ tb.exception + " " + request.url | urlencode }}&body={{ body | urlencode }}">Submit as an issue on GitHub</a> (requires an account with GitHub)</p>
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
{{ summary | safe }}
<p>Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}</p>
<pre>{{ last_frame.f_locals | pprint }}</pre>
{{ tb.render_summary(include_title=False) | safe }}
<p>Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}</p>
<pre>{{ last_frame.locals | pprint }}</pre>
</div>
{% endblock %}

View file

@ -3,17 +3,16 @@
import inspect
import json
import re
import sys
import traceback
from typing import Optional, TypedDict
import mwparserfromhell
from pprint import pprint
import flask
import lxml.html
import mwparserfromhell
import requests
import werkzeug.exceptions
from requests_oauthlib import OAuth1Session
from werkzeug.debug.tbtools import DebugTraceback
from werkzeug.debug.tbtools import get_current_traceback
from werkzeug.wrappers import Response
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
@ -29,31 +28,20 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
@app.before_request
def global_user() -> None:
def global_user():
"""Make username available everywhere."""
flask.g.user = wikidata_oauth.get_username()
@app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
"""Handle exception."""
exec_type, exc_value, current_traceback = sys.exc_info()
assert exc_value
tb = DebugTraceback(exc_value)
summary = tb.render_traceback_html(include_title=False)
exc_lines = "".join(tb._te.format_exception_only())
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
last_frame_args = inspect.getargs(last_frame.f_code)
def exception_handler(e):
tb = get_current_traceback()
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library)
last_frame_args = inspect.getargs(last_frame.code)
return (
flask.render_template(
"show_error.html",
plaintext=tb.render_traceback_text(),
exception=exc_lines,
exception_type=tb._te.exc_type.__name__,
summary=summary,
tb=tb,
last_frame=last_frame,
last_frame_args=last_frame_args,
),
@ -61,15 +49,14 @@ def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str,
)
def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]:
"""Parse Articles With Multiple Dablinks."""
articles = []
table = root.find(".//table")
assert table is not None
for tr in table:
title = tr[0][0].text
count_text = tr[1][0].text
assert title and count_text and count_text.endswith(" links")
assert count_text.endswith(" links")
count = int(count_text[:-6])
articles.append((title, count))
@ -78,8 +65,7 @@ def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str
@app.route("/")
def index() -> str | Response:
"""Index page."""
def index():
title = flask.request.args.get("title")
exists = None
if title:
@ -97,10 +83,7 @@ def index() -> str | Response:
# articles = [line[:-1] for line in open("article_list")]
return flask.render_template(
"index.html",
title=title,
exists=exists,
articles=articles,
"index.html", title=title, exists=exists, articles=articles,
)
@ -232,17 +215,17 @@ def save(enwiki: str) -> Response | str:
def redirect_if_needed(enwiki: str) -> Optional[Response]:
"""Check if there are spaces in the article name and redirect."""
endpoint = flask.request.endpoint
assert endpoint
return (
flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
flask.redirect(
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
)
if " " in enwiki
else None
)
@app.route("/enwiki/<path:enwiki>")
def article_page(enwiki: str) -> Response | str:
def article_page(enwiki: str) -> Response:
"""Article Page."""
redirect = redirect_if_needed(enwiki)
if redirect:
@ -261,8 +244,7 @@ def article_page(enwiki: str) -> Response | str:
@app.route("/oauth/start")
def start_oauth() -> Response:
"""Start OAuth."""
def start_oauth():
next_page = flask.request.args.get("next")
if next_page:
flask.session["after_login"] = next_page
@ -285,8 +267,7 @@ def start_oauth() -> Response:
@app.route("/oauth/callback", methods=["GET"])
def oauth_callback() -> Response:
"""Autentication callback."""
def oauth_callback():
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
@ -313,12 +294,11 @@ def oauth_callback() -> Response:
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
next_page = flask.session.get("after_login")
return flask.redirect(next_page if next_page else flask.url_for("index"))
return flask.redirect(next_page) if next_page else flask.url_for("index")
@app.route("/oauth/disconnect")
def oauth_disconnect() -> Response:
"""Disconnect OAuth."""
def oauth_disconnect():
for key in "owner_key", "owner_secret", "username", "after_login":
if key in flask.session:
del flask.session[key]