Compare commits
2 commits
3d1ee54308
...
0ff82e1c05
Author | SHA1 | Date | |
---|---|---|---|
Edward Betts | 0ff82e1c05 | ||
Edward Betts | 1a479dacbc |
|
@ -1,11 +1,17 @@
|
||||||
|
"""Wikidata OAuth."""
|
||||||
|
|
||||||
|
import typing
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
from flask import current_app, session
|
from flask import current_app, session
|
||||||
|
from requests.models import Response
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
|
|
||||||
wiki_hostname = "en.wikipedia.org"
|
wiki_hostname = "en.wikipedia.org"
|
||||||
api_url = f"https://{wiki_hostname}/w/api.php"
|
api_url = f"https://{wiki_hostname}/w/api.php"
|
||||||
|
|
||||||
|
CallParams = dict[str, str | int]
|
||||||
|
|
||||||
|
|
||||||
def get_edit_proxy() -> dict[str, str]:
|
def get_edit_proxy() -> dict[str, str]:
|
||||||
"""Retrieve proxy information from config."""
|
"""Retrieve proxy information from config."""
|
||||||
|
@ -16,7 +22,7 @@ def get_edit_proxy() -> dict[str, str]:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def api_post_request(params: dict[str, str | int]):
|
def api_post_request(params: CallParams) -> Response:
|
||||||
"""HTTP Post using Oauth."""
|
"""HTTP Post using Oauth."""
|
||||||
app = current_app
|
app = current_app
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -27,11 +33,12 @@ def api_post_request(params: dict[str, str | int]):
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
proxies = get_edit_proxy()
|
r: Response = oauth.post(api_url, data=params, timeout=10, proxies=get_edit_proxy())
|
||||||
return oauth.post(api_url, data=params, timeout=10, proxies=proxies)
|
return r
|
||||||
|
|
||||||
|
|
||||||
def raw_request(params):
|
def raw_request(params: CallParams) -> Response:
|
||||||
|
"""Raw request."""
|
||||||
app = current_app
|
app = current_app
|
||||||
url = api_url + "?" + urlencode(params)
|
url = api_url + "?" + urlencode(params)
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -42,43 +49,49 @@ def raw_request(params):
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
proxies = get_edit_proxy()
|
r: Response = oauth.get(url, timeout=10, proxies=get_edit_proxy())
|
||||||
return oauth.get(url, timeout=10, proxies=proxies)
|
return r
|
||||||
|
|
||||||
|
|
||||||
def api_request(params):
|
def api_request(params: CallParams) -> dict[str, typing.Any]:
|
||||||
return raw_request(params).json()
|
"""Make API request and return object parsed from JSON."""
|
||||||
|
return typing.cast(dict[str, typing.Any], raw_request(params).json())
|
||||||
|
|
||||||
|
|
||||||
def get_token():
|
def get_token() -> str:
|
||||||
params = {
|
"""Get csrftoken from MediaWiki API."""
|
||||||
|
params: CallParams = {
|
||||||
"action": "query",
|
"action": "query",
|
||||||
"meta": "tokens",
|
"meta": "tokens",
|
||||||
"format": "json",
|
"format": "json",
|
||||||
"formatversion": 2,
|
"formatversion": 2,
|
||||||
}
|
}
|
||||||
reply = api_request(params)
|
reply = api_request(params)
|
||||||
token = reply["query"]["tokens"]["csrftoken"]
|
token: str = reply["query"]["tokens"]["csrftoken"]
|
||||||
|
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
||||||
def userinfo_call():
|
def userinfo_call() -> dict[str, typing.Any]:
|
||||||
"""Request user information via OAuth."""
|
"""Request user information via OAuth."""
|
||||||
params = {"action": "query", "meta": "userinfo", "format": "json"}
|
params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
|
||||||
return api_request(params)
|
return api_request(params)
|
||||||
|
|
||||||
|
|
||||||
def get_username():
|
def get_username() -> str | None:
|
||||||
|
"""Get username for current user."""
|
||||||
if "owner_key" not in session:
|
if "owner_key" not in session:
|
||||||
return # not authorized
|
return None # not authorized
|
||||||
|
|
||||||
if "username" in session:
|
if "username" in session:
|
||||||
|
assert isinstance(session["username"], str)
|
||||||
return session["username"]
|
return session["username"]
|
||||||
|
|
||||||
reply = userinfo_call()
|
reply = userinfo_call()
|
||||||
if "query" not in reply:
|
if "query" not in reply:
|
||||||
return
|
return None
|
||||||
session["username"] = reply["query"]["userinfo"]["name"]
|
username = reply["query"]["userinfo"]["name"]
|
||||||
|
assert isinstance(username, str)
|
||||||
|
session["username"] = username
|
||||||
|
|
||||||
return session["username"]
|
return username
|
||||||
|
|
|
@ -7,24 +7,17 @@
|
||||||
{% block content %}
|
{% block content %}
|
||||||
<div class="p-2">
|
<div class="p-2">
|
||||||
|
|
||||||
<h1>Software error: {{ tb.exception_type }}</h1>
|
<h1>Software error: {{ exception_type }}</h1>
|
||||||
<div>
|
<div>
|
||||||
<pre>{{ tb.exception }}</pre>
|
<pre>{{ exception }}</pre>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{% set body %}
|
|
||||||
URL: {{ request.url }}
|
|
||||||
|
|
||||||
{{ tb.plaintext | safe }}
|
|
||||||
{% endset %}
|
|
||||||
|
|
||||||
<p><a class="btn btn-primary btn-lg" role="button" href="https://github.com/EdwardBetts/dab-mechanic/issues/new?title={{ tb.exception + " " + request.url | urlencode }}&body={{ body | urlencode }}">Submit as an issue on GitHub</a> (requires an account with GitHub)</p>
|
|
||||||
|
|
||||||
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
|
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
|
||||||
{{ tb.render_summary(include_title=False) | safe }}
|
{{ summary | safe }}
|
||||||
|
|
||||||
|
<p>Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}</p>
|
||||||
|
<pre>{{ last_frame.f_locals | pprint }}</pre>
|
||||||
|
|
||||||
<p>Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}</p>
|
|
||||||
<pre>{{ last_frame.locals | pprint }}</pre>
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
54
web_view.py
54
web_view.py
|
@ -3,6 +3,8 @@
|
||||||
import inspect
|
import inspect
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
|
@ -10,7 +12,7 @@ import lxml.html
|
||||||
import requests
|
import requests
|
||||||
import werkzeug.exceptions
|
import werkzeug.exceptions
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
from werkzeug.debug.tbtools import get_current_traceback
|
from werkzeug.debug.tbtools import DebugTraceback
|
||||||
from werkzeug.wrappers import Response
|
from werkzeug.wrappers import Response
|
||||||
|
|
||||||
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
|
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
|
||||||
|
@ -26,20 +28,31 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
|
||||||
|
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def global_user():
|
def global_user() -> None:
|
||||||
"""Make username available everywhere."""
|
"""Make username available everywhere."""
|
||||||
flask.g.user = wikidata_oauth.get_username()
|
flask.g.user = wikidata_oauth.get_username()
|
||||||
|
|
||||||
|
|
||||||
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
||||||
def exception_handler(e):
|
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
||||||
tb = get_current_traceback()
|
"""Handle exception."""
|
||||||
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library)
|
exec_type, exc_value, current_traceback = sys.exc_info()
|
||||||
last_frame_args = inspect.getargs(last_frame.code)
|
assert exc_value
|
||||||
|
tb = DebugTraceback(exc_value)
|
||||||
|
|
||||||
|
summary = tb.render_traceback_html(include_title=False)
|
||||||
|
exc_lines = "".join(tb._te.format_exception_only())
|
||||||
|
|
||||||
|
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
||||||
|
last_frame_args = inspect.getargs(last_frame.f_code)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
flask.render_template(
|
flask.render_template(
|
||||||
"show_error.html",
|
"show_error.html",
|
||||||
tb=tb,
|
plaintext=tb.render_traceback_text(),
|
||||||
|
exception=exc_lines,
|
||||||
|
exception_type=tb._te.exc_type.__name__,
|
||||||
|
summary=summary,
|
||||||
last_frame=last_frame,
|
last_frame=last_frame,
|
||||||
last_frame_args=last_frame_args,
|
last_frame_args=last_frame_args,
|
||||||
),
|
),
|
||||||
|
@ -47,14 +60,15 @@ def exception_handler(e):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]:
|
def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
|
||||||
"""Parse Articles With Multiple Dablinks."""
|
"""Parse Articles With Multiple Dablinks."""
|
||||||
articles = []
|
articles = []
|
||||||
table = root.find(".//table")
|
table = root.find(".//table")
|
||||||
|
assert table is not None
|
||||||
for tr in table:
|
for tr in table:
|
||||||
title = tr[0][0].text
|
title = tr[0][0].text
|
||||||
count_text = tr[1][0].text
|
count_text = tr[1][0].text
|
||||||
assert count_text.endswith(" links")
|
assert title and count_text and count_text.endswith(" links")
|
||||||
count = int(count_text[:-6])
|
count = int(count_text[:-6])
|
||||||
|
|
||||||
articles.append((title, count))
|
articles.append((title, count))
|
||||||
|
@ -63,7 +77,8 @@ def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, in
|
||||||
|
|
||||||
|
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def index():
|
def index() -> str:
|
||||||
|
"""Index page."""
|
||||||
r = requests.get(awdl_url, params={"limit": 100})
|
r = requests.get(awdl_url, params={"limit": 100})
|
||||||
root = lxml.html.fromstring(r.content)
|
root = lxml.html.fromstring(r.content)
|
||||||
articles = parse_articles_with_dab_links(root)
|
articles = parse_articles_with_dab_links(root)
|
||||||
|
@ -125,17 +140,17 @@ def save(enwiki: str) -> Response | str:
|
||||||
|
|
||||||
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
||||||
"""Check if there are spaces in the article name and redirect."""
|
"""Check if there are spaces in the article name and redirect."""
|
||||||
|
endpoint = flask.request.endpoint
|
||||||
|
assert endpoint
|
||||||
return (
|
return (
|
||||||
flask.redirect(
|
flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
|
||||||
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
|
|
||||||
)
|
|
||||||
if " " in enwiki
|
if " " in enwiki
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/enwiki/<path:enwiki>")
|
@app.route("/enwiki/<path:enwiki>")
|
||||||
def article_page(enwiki: str) -> Response:
|
def article_page(enwiki: str) -> Response | str:
|
||||||
"""Article Page."""
|
"""Article Page."""
|
||||||
redirect = redirect_if_needed(enwiki)
|
redirect = redirect_if_needed(enwiki)
|
||||||
if redirect:
|
if redirect:
|
||||||
|
@ -151,7 +166,8 @@ def article_page(enwiki: str) -> Response:
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/start")
|
@app.route("/oauth/start")
|
||||||
def start_oauth():
|
def start_oauth() -> Response:
|
||||||
|
"""Start OAuth."""
|
||||||
next_page = flask.request.args.get("next")
|
next_page = flask.request.args.get("next")
|
||||||
if next_page:
|
if next_page:
|
||||||
flask.session["after_login"] = next_page
|
flask.session["after_login"] = next_page
|
||||||
|
@ -174,7 +190,8 @@ def start_oauth():
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/callback", methods=["GET"])
|
@app.route("/oauth/callback", methods=["GET"])
|
||||||
def oauth_callback():
|
def oauth_callback() -> Response:
|
||||||
|
"""Autentication callback."""
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
client_secret = app.config["CLIENT_SECRET"]
|
client_secret = app.config["CLIENT_SECRET"]
|
||||||
|
|
||||||
|
@ -201,11 +218,12 @@ def oauth_callback():
|
||||||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||||
|
|
||||||
next_page = flask.session.get("after_login")
|
next_page = flask.session.get("after_login")
|
||||||
return flask.redirect(next_page) if next_page else flask.url_for("index")
|
return flask.redirect(next_page if next_page else flask.url_for("index"))
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/disconnect")
|
@app.route("/oauth/disconnect")
|
||||||
def oauth_disconnect():
|
def oauth_disconnect() -> Response:
|
||||||
|
"""Disconnect OAuth."""
|
||||||
for key in "owner_key", "owner_secret", "username", "after_login":
|
for key in "owner_key", "owner_secret", "username", "after_login":
|
||||||
if key in flask.session:
|
if key in flask.session:
|
||||||
del flask.session[key]
|
del flask.session[key]
|
||||||
|
|
Loading…
Reference in a new issue