Compare commits
No commits in common. "0ff82e1c05486b4b2f705722cd1ee98f431b76ab" and "3d1ee543084db217df6dc092062dbe33db0cdd6a" have entirely different histories.
0ff82e1c05
...
3d1ee54308
|
@ -1,17 +1,11 @@
|
||||||
"""Wikidata OAuth."""
|
|
||||||
|
|
||||||
import typing
|
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
from flask import current_app, session
|
from flask import current_app, session
|
||||||
from requests.models import Response
|
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
|
|
||||||
wiki_hostname = "en.wikipedia.org"
|
wiki_hostname = "en.wikipedia.org"
|
||||||
api_url = f"https://{wiki_hostname}/w/api.php"
|
api_url = f"https://{wiki_hostname}/w/api.php"
|
||||||
|
|
||||||
CallParams = dict[str, str | int]
|
|
||||||
|
|
||||||
|
|
||||||
def get_edit_proxy() -> dict[str, str]:
|
def get_edit_proxy() -> dict[str, str]:
|
||||||
"""Retrieve proxy information from config."""
|
"""Retrieve proxy information from config."""
|
||||||
|
@ -22,7 +16,7 @@ def get_edit_proxy() -> dict[str, str]:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def api_post_request(params: CallParams) -> Response:
|
def api_post_request(params: dict[str, str | int]):
|
||||||
"""HTTP Post using Oauth."""
|
"""HTTP Post using Oauth."""
|
||||||
app = current_app
|
app = current_app
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -33,12 +27,11 @@ def api_post_request(params: CallParams) -> Response:
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
r: Response = oauth.post(api_url, data=params, timeout=10, proxies=get_edit_proxy())
|
proxies = get_edit_proxy()
|
||||||
return r
|
return oauth.post(api_url, data=params, timeout=10, proxies=proxies)
|
||||||
|
|
||||||
|
|
||||||
def raw_request(params: CallParams) -> Response:
|
def raw_request(params):
|
||||||
"""Raw request."""
|
|
||||||
app = current_app
|
app = current_app
|
||||||
url = api_url + "?" + urlencode(params)
|
url = api_url + "?" + urlencode(params)
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -49,49 +42,43 @@ def raw_request(params: CallParams) -> Response:
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
r: Response = oauth.get(url, timeout=10, proxies=get_edit_proxy())
|
proxies = get_edit_proxy()
|
||||||
return r
|
return oauth.get(url, timeout=10, proxies=proxies)
|
||||||
|
|
||||||
|
|
||||||
def api_request(params: CallParams) -> dict[str, typing.Any]:
|
def api_request(params):
|
||||||
"""Make API request and return object parsed from JSON."""
|
return raw_request(params).json()
|
||||||
return typing.cast(dict[str, typing.Any], raw_request(params).json())
|
|
||||||
|
|
||||||
|
|
||||||
def get_token() -> str:
|
def get_token():
|
||||||
"""Get csrftoken from MediaWiki API."""
|
params = {
|
||||||
params: CallParams = {
|
|
||||||
"action": "query",
|
"action": "query",
|
||||||
"meta": "tokens",
|
"meta": "tokens",
|
||||||
"format": "json",
|
"format": "json",
|
||||||
"formatversion": 2,
|
"formatversion": 2,
|
||||||
}
|
}
|
||||||
reply = api_request(params)
|
reply = api_request(params)
|
||||||
token: str = reply["query"]["tokens"]["csrftoken"]
|
token = reply["query"]["tokens"]["csrftoken"]
|
||||||
|
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
||||||
def userinfo_call() -> dict[str, typing.Any]:
|
def userinfo_call():
|
||||||
"""Request user information via OAuth."""
|
"""Request user information via OAuth."""
|
||||||
params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
|
params = {"action": "query", "meta": "userinfo", "format": "json"}
|
||||||
return api_request(params)
|
return api_request(params)
|
||||||
|
|
||||||
|
|
||||||
def get_username() -> str | None:
|
def get_username():
|
||||||
"""Get username for current user."""
|
|
||||||
if "owner_key" not in session:
|
if "owner_key" not in session:
|
||||||
return None # not authorized
|
return # not authorized
|
||||||
|
|
||||||
if "username" in session:
|
if "username" in session:
|
||||||
assert isinstance(session["username"], str)
|
|
||||||
return session["username"]
|
return session["username"]
|
||||||
|
|
||||||
reply = userinfo_call()
|
reply = userinfo_call()
|
||||||
if "query" not in reply:
|
if "query" not in reply:
|
||||||
return None
|
return
|
||||||
username = reply["query"]["userinfo"]["name"]
|
session["username"] = reply["query"]["userinfo"]["name"]
|
||||||
assert isinstance(username, str)
|
|
||||||
session["username"] = username
|
|
||||||
|
|
||||||
return username
|
return session["username"]
|
||||||
|
|
|
@ -7,17 +7,24 @@
|
||||||
{% block content %}
|
{% block content %}
|
||||||
<div class="p-2">
|
<div class="p-2">
|
||||||
|
|
||||||
<h1>Software error: {{ exception_type }}</h1>
|
<h1>Software error: {{ tb.exception_type }}</h1>
|
||||||
<div>
|
<div>
|
||||||
<pre>{{ exception }}</pre>
|
<pre>{{ tb.exception }}</pre>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
{% set body %}
|
||||||
|
URL: {{ request.url }}
|
||||||
|
|
||||||
|
{{ tb.plaintext | safe }}
|
||||||
|
{% endset %}
|
||||||
|
|
||||||
|
<p><a class="btn btn-primary btn-lg" role="button" href="https://github.com/EdwardBetts/dab-mechanic/issues/new?title={{ tb.exception + " " + request.url | urlencode }}&body={{ body | urlencode }}">Submit as an issue on GitHub</a> (requires an account with GitHub)</p>
|
||||||
|
|
||||||
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
|
<h2 class="traceback">Traceback <em>(most recent call last)</em></h2>
|
||||||
{{ summary | safe }}
|
{{ tb.render_summary(include_title=False) | safe }}
|
||||||
|
|
||||||
<p>Error in function "{{ last_frame.f_code.co_name }}": {{ last_frame_args | pprint }}</p>
|
|
||||||
<pre>{{ last_frame.f_locals | pprint }}</pre>
|
|
||||||
|
|
||||||
|
<p>Error in function "{{ last_frame.function_name }}": {{ last_frame_args | pprint }}</p>
|
||||||
|
<pre>{{ last_frame.locals | pprint }}</pre>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
54
web_view.py
54
web_view.py
|
@ -3,8 +3,6 @@
|
||||||
import inspect
|
import inspect
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import sys
|
|
||||||
import traceback
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
|
@ -12,7 +10,7 @@ import lxml.html
|
||||||
import requests
|
import requests
|
||||||
import werkzeug.exceptions
|
import werkzeug.exceptions
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
from werkzeug.debug.tbtools import DebugTraceback
|
from werkzeug.debug.tbtools import get_current_traceback
|
||||||
from werkzeug.wrappers import Response
|
from werkzeug.wrappers import Response
|
||||||
|
|
||||||
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
|
from dab_mechanic import mediawiki_api, wikidata_oauth, wikipedia
|
||||||
|
@ -28,31 +26,20 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
|
||||||
|
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def global_user() -> None:
|
def global_user():
|
||||||
"""Make username available everywhere."""
|
"""Make username available everywhere."""
|
||||||
flask.g.user = wikidata_oauth.get_username()
|
flask.g.user = wikidata_oauth.get_username()
|
||||||
|
|
||||||
|
|
||||||
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
||||||
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
def exception_handler(e):
|
||||||
"""Handle exception."""
|
tb = get_current_traceback()
|
||||||
exec_type, exc_value, current_traceback = sys.exc_info()
|
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library)
|
||||||
assert exc_value
|
last_frame_args = inspect.getargs(last_frame.code)
|
||||||
tb = DebugTraceback(exc_value)
|
|
||||||
|
|
||||||
summary = tb.render_traceback_html(include_title=False)
|
|
||||||
exc_lines = "".join(tb._te.format_exception_only())
|
|
||||||
|
|
||||||
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
|
||||||
last_frame_args = inspect.getargs(last_frame.f_code)
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
flask.render_template(
|
flask.render_template(
|
||||||
"show_error.html",
|
"show_error.html",
|
||||||
plaintext=tb.render_traceback_text(),
|
tb=tb,
|
||||||
exception=exc_lines,
|
|
||||||
exception_type=tb._te.exc_type.__name__,
|
|
||||||
summary=summary,
|
|
||||||
last_frame=last_frame,
|
last_frame=last_frame,
|
||||||
last_frame_args=last_frame_args,
|
last_frame_args=last_frame_args,
|
||||||
),
|
),
|
||||||
|
@ -60,15 +47,14 @@ def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
|
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]:
|
||||||
"""Parse Articles With Multiple Dablinks."""
|
"""Parse Articles With Multiple Dablinks."""
|
||||||
articles = []
|
articles = []
|
||||||
table = root.find(".//table")
|
table = root.find(".//table")
|
||||||
assert table is not None
|
|
||||||
for tr in table:
|
for tr in table:
|
||||||
title = tr[0][0].text
|
title = tr[0][0].text
|
||||||
count_text = tr[1][0].text
|
count_text = tr[1][0].text
|
||||||
assert title and count_text and count_text.endswith(" links")
|
assert count_text.endswith(" links")
|
||||||
count = int(count_text[:-6])
|
count = int(count_text[:-6])
|
||||||
|
|
||||||
articles.append((title, count))
|
articles.append((title, count))
|
||||||
|
@ -77,8 +63,7 @@ def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str
|
||||||
|
|
||||||
|
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def index() -> str:
|
def index():
|
||||||
"""Index page."""
|
|
||||||
r = requests.get(awdl_url, params={"limit": 100})
|
r = requests.get(awdl_url, params={"limit": 100})
|
||||||
root = lxml.html.fromstring(r.content)
|
root = lxml.html.fromstring(r.content)
|
||||||
articles = parse_articles_with_dab_links(root)
|
articles = parse_articles_with_dab_links(root)
|
||||||
|
@ -140,17 +125,17 @@ def save(enwiki: str) -> Response | str:
|
||||||
|
|
||||||
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
||||||
"""Check if there are spaces in the article name and redirect."""
|
"""Check if there are spaces in the article name and redirect."""
|
||||||
endpoint = flask.request.endpoint
|
|
||||||
assert endpoint
|
|
||||||
return (
|
return (
|
||||||
flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
|
flask.redirect(
|
||||||
|
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
|
||||||
|
)
|
||||||
if " " in enwiki
|
if " " in enwiki
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/enwiki/<path:enwiki>")
|
@app.route("/enwiki/<path:enwiki>")
|
||||||
def article_page(enwiki: str) -> Response | str:
|
def article_page(enwiki: str) -> Response:
|
||||||
"""Article Page."""
|
"""Article Page."""
|
||||||
redirect = redirect_if_needed(enwiki)
|
redirect = redirect_if_needed(enwiki)
|
||||||
if redirect:
|
if redirect:
|
||||||
|
@ -166,8 +151,7 @@ def article_page(enwiki: str) -> Response | str:
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/start")
|
@app.route("/oauth/start")
|
||||||
def start_oauth() -> Response:
|
def start_oauth():
|
||||||
"""Start OAuth."""
|
|
||||||
next_page = flask.request.args.get("next")
|
next_page = flask.request.args.get("next")
|
||||||
if next_page:
|
if next_page:
|
||||||
flask.session["after_login"] = next_page
|
flask.session["after_login"] = next_page
|
||||||
|
@ -190,8 +174,7 @@ def start_oauth() -> Response:
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/callback", methods=["GET"])
|
@app.route("/oauth/callback", methods=["GET"])
|
||||||
def oauth_callback() -> Response:
|
def oauth_callback():
|
||||||
"""Autentication callback."""
|
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
client_secret = app.config["CLIENT_SECRET"]
|
client_secret = app.config["CLIENT_SECRET"]
|
||||||
|
|
||||||
|
@ -218,12 +201,11 @@ def oauth_callback() -> Response:
|
||||||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||||
|
|
||||||
next_page = flask.session.get("after_login")
|
next_page = flask.session.get("after_login")
|
||||||
return flask.redirect(next_page if next_page else flask.url_for("index"))
|
return flask.redirect(next_page) if next_page else flask.url_for("index")
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/disconnect")
|
@app.route("/oauth/disconnect")
|
||||||
def oauth_disconnect() -> Response:
|
def oauth_disconnect():
|
||||||
"""Disconnect OAuth."""
|
|
||||||
for key in "owner_key", "owner_secret", "username", "after_login":
|
for key in "owner_key", "owner_secret", "username", "after_login":
|
||||||
if key in flask.session:
|
if key in flask.session:
|
||||||
del flask.session[key]
|
del flask.session[key]
|
||||||
|
|
Loading…
Reference in a new issue