Merge branch 'main' into one-at-a-time
This commit is contained in:
commit
edfa0f8b0c
|
@ -1,6 +1,10 @@
|
|||
"""Wikidata OAuth."""
|
||||
|
||||
import typing
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from flask import current_app, session
|
||||
from requests.models import Response
|
||||
from requests_oauthlib import OAuth1Session
|
||||
|
||||
WIKI_HOSTNAME = "en.wikipedia.org"
|
||||
|
@ -8,6 +12,8 @@ API_URL = f"https://{WIKI_HOSTNAME}/w/api.php"
|
|||
|
||||
TIMEOUT = 20
|
||||
|
||||
CallParams = dict[str, str | int]
|
||||
|
||||
|
||||
def get_edit_proxy() -> dict[str, str]:
|
||||
"""Retrieve proxy information from config."""
|
||||
|
@ -18,7 +24,7 @@ def get_edit_proxy() -> dict[str, str]:
|
|||
return {}
|
||||
|
||||
|
||||
def api_post_request(params: dict[str, str | int]):
|
||||
def api_post_request(params: CallParams) -> Response:
|
||||
"""HTTP Post using Oauth."""
|
||||
app = current_app
|
||||
client_key = app.config["CLIENT_KEY"]
|
||||
|
@ -30,10 +36,12 @@ def api_post_request(params: dict[str, str | int]):
|
|||
resource_owner_secret=session["owner_secret"],
|
||||
)
|
||||
proxies = get_edit_proxy()
|
||||
return oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies)
|
||||
r: Response = oauth.post(API_URL, data=params, timeout=TIMEOUT, proxies=proxies)
|
||||
return r
|
||||
|
||||
|
||||
def raw_request(params):
|
||||
def raw_request(params: CallParams) -> Response:
|
||||
"""Raw request."""
|
||||
app = current_app
|
||||
url = API_URL + "?" + urlencode(params)
|
||||
client_key = app.config["CLIENT_KEY"]
|
||||
|
@ -45,42 +53,49 @@ def raw_request(params):
|
|||
resource_owner_secret=session["owner_secret"],
|
||||
)
|
||||
proxies = get_edit_proxy()
|
||||
return oauth.get(url, timeout=TIMEOUT, proxies=proxies)
|
||||
r: Response = oauth.get(url, timeout=TIMEOUT, proxies=proxies)
|
||||
return r
|
||||
|
||||
|
||||
def api_request(params):
|
||||
return raw_request(params).json()
|
||||
def api_request(params: CallParams) -> dict[str, typing.Any]:
|
||||
"""Make API request and return object parsed from JSON."""
|
||||
return typing.cast(dict[str, typing.Any], raw_request(params).json())
|
||||
|
||||
|
||||
def get_token():
|
||||
params = {
|
||||
def get_token() -> str:
|
||||
"""Get csrftoken from MediaWiki API."""
|
||||
params: CallParams = {
|
||||
"action": "query",
|
||||
"meta": "tokens",
|
||||
"format": "json",
|
||||
"formatversion": 2,
|
||||
}
|
||||
reply = api_request(params)
|
||||
token = reply["query"]["tokens"]["csrftoken"]
|
||||
token: str = reply["query"]["tokens"]["csrftoken"]
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def userinfo_call():
|
||||
def userinfo_call() -> dict[str, typing.Any]:
|
||||
"""Request user information via OAuth."""
|
||||
params = {"action": "query", "meta": "userinfo", "format": "json"}
|
||||
params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
|
||||
return api_request(params)
|
||||
|
||||
|
||||
def get_username():
|
||||
def get_username() -> str | None:
|
||||
"""Get username for current user."""
|
||||
if "owner_key" not in session:
|
||||
return # not authorized
|
||||
return None # not authorized
|
||||
|
||||
if "username" in session:
|
||||
assert isinstance(session["username"], str)
|
||||
return session["username"]
|
||||
|
||||
reply = userinfo_call()
|
||||
if "query" not in reply:
|
||||
return
|
||||
session["username"] = reply["query"]["userinfo"]["name"]
|
||||
return None
|
||||
username = reply["query"]["userinfo"]["name"]
|
||||
assert isinstance(username, str)
|
||||
session["username"] = username
|
||||
|
||||
return session["username"]
|
||||
return username
|
||||
|
|
|
@ -7,24 +7,17 @@
|
|||
{% block content %}
|
||||
<div class="p-2">
|
||||
|
||||
<h1>Software error: {{ tb.exception_type }}</h1>
|
||||
<h1>Software error: {{ exception_type }}</h1>
|
||||
<div>
|
||||
<pre>{{ tb.exception }}</pre>
|
||||
<pre>{{ exception }}</pre>
|
||||
</div>
|
||||
|
||||
{% set body %}
|
||||
URL: {{ request.url }}
|
||||
|
||||
{{ tb.plaintext | safe }}
|
||||
{% endset %}
|
||||
|
||||
<p><a class="btn btn-primary btn-lg" role="button" href="https://github.com/EdwardBetts/dab-mechanic/issues/new?title={{ tb.exception + " " + request.url | urlencode }}&body={{ body | urlencode }}">Submit as an issue on GitHub</a> (requires an account with GitHub)</p>
|
||||
|
||||
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
|
||||
{{ tb.render_summary(include_title=False) | safe }}
|
||||
{{ summary | safe }}
|
||||
|
||||
<p>Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}</p>
|
||||
<pre>{{ last_frame.f_locals | pprint }}</pre>
|
||||
|
||||
<p>Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}</p>
|
||||
<pre>{{ last_frame.locals | pprint }}</pre>
|
||||
</div>
|
||||
|
||||
{% endblock %}
|
||||
|
|
62
web_view.py
62
web_view.py
|
@ -3,16 +3,17 @@
|
|||
import inspect
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Optional, TypedDict
|
||||
import mwparserfromhell
|
||||
from pprint import pprint
|
||||
|
||||
import flask
|
||||
import lxml.html
|
||||
import mwparserfromhell
|
||||
import requests
|
||||
import werkzeug.exceptions
|
||||
from requests_oauthlib import OAuth1Session
|
||||
from werkzeug.debug.tbtools import get_current_traceback
|
||||
from werkzeug.debug.tbtools import DebugTraceback
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
|
||||
|
@ -28,20 +29,31 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
|
|||
|
||||
|
||||
@app.before_request
|
||||
def global_user():
|
||||
def global_user() -> None:
|
||||
"""Make username available everywhere."""
|
||||
flask.g.user = wikidata_oauth.get_username()
|
||||
|
||||
|
||||
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
||||
def exception_handler(e):
|
||||
tb = get_current_traceback()
|
||||
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library)
|
||||
last_frame_args = inspect.getargs(last_frame.code)
|
||||
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
||||
"""Handle exception."""
|
||||
exec_type, exc_value, current_traceback = sys.exc_info()
|
||||
assert exc_value
|
||||
tb = DebugTraceback(exc_value)
|
||||
|
||||
summary = tb.render_traceback_html(include_title=False)
|
||||
exc_lines = "".join(tb._te.format_exception_only())
|
||||
|
||||
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
||||
last_frame_args = inspect.getargs(last_frame.f_code)
|
||||
|
||||
return (
|
||||
flask.render_template(
|
||||
"show_error.html",
|
||||
tb=tb,
|
||||
plaintext=tb.render_traceback_text(),
|
||||
exception=exc_lines,
|
||||
exception_type=tb._te.exc_type.__name__,
|
||||
summary=summary,
|
||||
last_frame=last_frame,
|
||||
last_frame_args=last_frame_args,
|
||||
),
|
||||
|
@ -49,14 +61,15 @@ def exception_handler(e):
|
|||
)
|
||||
|
||||
|
||||
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]:
|
||||
def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
|
||||
"""Parse Articles With Multiple Dablinks."""
|
||||
articles = []
|
||||
table = root.find(".//table")
|
||||
assert table is not None
|
||||
for tr in table:
|
||||
title = tr[0][0].text
|
||||
count_text = tr[1][0].text
|
||||
assert count_text.endswith(" links")
|
||||
assert title and count_text and count_text.endswith(" links")
|
||||
count = int(count_text[:-6])
|
||||
|
||||
articles.append((title, count))
|
||||
|
@ -65,7 +78,8 @@ def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, in
|
|||
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
def index() -> str | Response:
|
||||
"""Index page."""
|
||||
title = flask.request.args.get("title")
|
||||
exists = None
|
||||
if title:
|
||||
|
@ -83,7 +97,10 @@ def index():
|
|||
# articles = [line[:-1] for line in open("article_list")]
|
||||
|
||||
return flask.render_template(
|
||||
"index.html", title=title, exists=exists, articles=articles,
|
||||
"index.html",
|
||||
title=title,
|
||||
exists=exists,
|
||||
articles=articles,
|
||||
)
|
||||
|
||||
|
||||
|
@ -215,17 +232,17 @@ def save(enwiki: str) -> Response | str:
|
|||
|
||||
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
||||
"""Check if there are spaces in the article name and redirect."""
|
||||
endpoint = flask.request.endpoint
|
||||
assert endpoint
|
||||
return (
|
||||
flask.redirect(
|
||||
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
|
||||
)
|
||||
flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
|
||||
if " " in enwiki
|
||||
else None
|
||||
)
|
||||
|
||||
|
||||
@app.route("/enwiki/<path:enwiki>")
|
||||
def article_page(enwiki: str) -> Response:
|
||||
def article_page(enwiki: str) -> Response | str:
|
||||
"""Article Page."""
|
||||
redirect = redirect_if_needed(enwiki)
|
||||
if redirect:
|
||||
|
@ -244,7 +261,8 @@ def article_page(enwiki: str) -> Response:
|
|||
|
||||
|
||||
@app.route("/oauth/start")
|
||||
def start_oauth():
|
||||
def start_oauth() -> Response:
|
||||
"""Start OAuth."""
|
||||
next_page = flask.request.args.get("next")
|
||||
if next_page:
|
||||
flask.session["after_login"] = next_page
|
||||
|
@ -267,7 +285,8 @@ def start_oauth():
|
|||
|
||||
|
||||
@app.route("/oauth/callback", methods=["GET"])
|
||||
def oauth_callback():
|
||||
def oauth_callback() -> Response:
|
||||
"""Autentication callback."""
|
||||
client_key = app.config["CLIENT_KEY"]
|
||||
client_secret = app.config["CLIENT_SECRET"]
|
||||
|
||||
|
@ -294,11 +313,12 @@ def oauth_callback():
|
|||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||
|
||||
next_page = flask.session.get("after_login")
|
||||
return flask.redirect(next_page) if next_page else flask.url_for("index")
|
||||
return flask.redirect(next_page if next_page else flask.url_for("index"))
|
||||
|
||||
|
||||
@app.route("/oauth/disconnect")
|
||||
def oauth_disconnect():
|
||||
def oauth_disconnect() -> Response:
|
||||
"""Disconnect OAuth."""
|
||||
for key in "owner_key", "owner_secret", "username", "after_login":
|
||||
if key in flask.session:
|
||||
del flask.session[key]
|
||||
|
|
Loading…
Reference in a new issue