Add types and docstrings.
This commit is contained in:
parent
3d1ee54308
commit
1a479dacbc
|
@ -1,11 +1,17 @@
|
||||||
|
"""Wikidata OAuth."""
|
||||||
|
|
||||||
|
import typing
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
from flask import current_app, session
|
from flask import current_app, session
|
||||||
|
from requests.models import Response
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
|
|
||||||
wiki_hostname = "en.wikipedia.org"
|
wiki_hostname = "en.wikipedia.org"
|
||||||
api_url = f"https://{wiki_hostname}/w/api.php"
|
api_url = f"https://{wiki_hostname}/w/api.php"
|
||||||
|
|
||||||
|
CallParams = dict[str, str | int]
|
||||||
|
|
||||||
|
|
||||||
def get_edit_proxy() -> dict[str, str]:
|
def get_edit_proxy() -> dict[str, str]:
|
||||||
"""Retrieve proxy information from config."""
|
"""Retrieve proxy information from config."""
|
||||||
|
@ -16,7 +22,7 @@ def get_edit_proxy() -> dict[str, str]:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def api_post_request(params: dict[str, str | int]):
|
def api_post_request(params: CallParams) -> Response:
|
||||||
"""HTTP Post using Oauth."""
|
"""HTTP Post using Oauth."""
|
||||||
app = current_app
|
app = current_app
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -27,11 +33,12 @@ def api_post_request(params: dict[str, str | int]):
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
proxies = get_edit_proxy()
|
r: Response = oauth.post(api_url, data=params, timeout=10, proxies=get_edit_proxy())
|
||||||
return oauth.post(api_url, data=params, timeout=10, proxies=proxies)
|
return r
|
||||||
|
|
||||||
|
|
||||||
def raw_request(params):
|
def raw_request(params: CallParams) -> Response:
|
||||||
|
"""Raw request."""
|
||||||
app = current_app
|
app = current_app
|
||||||
url = api_url + "?" + urlencode(params)
|
url = api_url + "?" + urlencode(params)
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
|
@ -42,43 +49,49 @@ def raw_request(params):
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
proxies = get_edit_proxy()
|
r: Response = oauth.get(url, timeout=10, proxies=get_edit_proxy())
|
||||||
return oauth.get(url, timeout=10, proxies=proxies)
|
return r
|
||||||
|
|
||||||
|
|
||||||
def api_request(params):
|
def api_request(params: CallParams) -> dict[str, typing.Any]:
|
||||||
return raw_request(params).json()
|
"""Make API request and return object parsed from JSON."""
|
||||||
|
return typing.cast(dict[str, typing.Any], raw_request(params).json())
|
||||||
|
|
||||||
|
|
||||||
def get_token():
|
def get_token() -> str:
|
||||||
params = {
|
"""Get csrftoken from MediaWiki API."""
|
||||||
|
params: CallParams = {
|
||||||
"action": "query",
|
"action": "query",
|
||||||
"meta": "tokens",
|
"meta": "tokens",
|
||||||
"format": "json",
|
"format": "json",
|
||||||
"formatversion": 2,
|
"formatversion": 2,
|
||||||
}
|
}
|
||||||
reply = api_request(params)
|
reply = api_request(params)
|
||||||
token = reply["query"]["tokens"]["csrftoken"]
|
token: str = reply["query"]["tokens"]["csrftoken"]
|
||||||
|
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
||||||
def userinfo_call():
|
def userinfo_call() -> dict[str, typing.Any]:
|
||||||
"""Request user information via OAuth."""
|
"""Request user information via OAuth."""
|
||||||
params = {"action": "query", "meta": "userinfo", "format": "json"}
|
params: CallParams = {"action": "query", "meta": "userinfo", "format": "json"}
|
||||||
return api_request(params)
|
return api_request(params)
|
||||||
|
|
||||||
|
|
||||||
def get_username():
|
def get_username() -> str | None:
|
||||||
|
"""Get username for current user."""
|
||||||
if "owner_key" not in session:
|
if "owner_key" not in session:
|
||||||
return # not authorized
|
return None # not authorized
|
||||||
|
|
||||||
if "username" in session:
|
if "username" in session:
|
||||||
|
assert isinstance(session["username"], str)
|
||||||
return session["username"]
|
return session["username"]
|
||||||
|
|
||||||
reply = userinfo_call()
|
reply = userinfo_call()
|
||||||
if "query" not in reply:
|
if "query" not in reply:
|
||||||
return
|
return None
|
||||||
session["username"] = reply["query"]["userinfo"]["name"]
|
username = reply["query"]["userinfo"]["name"]
|
||||||
|
assert isinstance(username, str)
|
||||||
|
session["username"] = username
|
||||||
|
|
||||||
return session["username"]
|
return username
|
||||||
|
|
29
web_view.py
29
web_view.py
|
@ -26,7 +26,7 @@ awdl_url = "https://dplbot.toolforge.org/articles_with_dab_links.php"
|
||||||
|
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def global_user():
|
def global_user() -> None:
|
||||||
"""Make username available everywhere."""
|
"""Make username available everywhere."""
|
||||||
flask.g.user = wikidata_oauth.get_username()
|
flask.g.user = wikidata_oauth.get_username()
|
||||||
|
|
||||||
|
@ -47,14 +47,15 @@ def exception_handler(e):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, int]]:
|
def parse_articles_with_dab_links(root: lxml.html.HtmlElement) -> list[tuple[str, int]]:
|
||||||
"""Parse Articles With Multiple Dablinks."""
|
"""Parse Articles With Multiple Dablinks."""
|
||||||
articles = []
|
articles = []
|
||||||
table = root.find(".//table")
|
table = root.find(".//table")
|
||||||
|
assert table is not None
|
||||||
for tr in table:
|
for tr in table:
|
||||||
title = tr[0][0].text
|
title = tr[0][0].text
|
||||||
count_text = tr[1][0].text
|
count_text = tr[1][0].text
|
||||||
assert count_text.endswith(" links")
|
assert title and count_text and count_text.endswith(" links")
|
||||||
count = int(count_text[:-6])
|
count = int(count_text[:-6])
|
||||||
|
|
||||||
articles.append((title, count))
|
articles.append((title, count))
|
||||||
|
@ -63,7 +64,8 @@ def parse_articles_with_dab_links(root: lxml.html.Element) -> list[tuple[str, in
|
||||||
|
|
||||||
|
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def index():
|
def index() -> str:
|
||||||
|
"""Index page."""
|
||||||
r = requests.get(awdl_url, params={"limit": 100})
|
r = requests.get(awdl_url, params={"limit": 100})
|
||||||
root = lxml.html.fromstring(r.content)
|
root = lxml.html.fromstring(r.content)
|
||||||
articles = parse_articles_with_dab_links(root)
|
articles = parse_articles_with_dab_links(root)
|
||||||
|
@ -125,17 +127,17 @@ def save(enwiki: str) -> Response | str:
|
||||||
|
|
||||||
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
def redirect_if_needed(enwiki: str) -> Optional[Response]:
|
||||||
"""Check if there are spaces in the article name and redirect."""
|
"""Check if there are spaces in the article name and redirect."""
|
||||||
|
endpoint = flask.request.endpoint
|
||||||
|
assert endpoint
|
||||||
return (
|
return (
|
||||||
flask.redirect(
|
flask.redirect(flask.url_for(endpoint, enwiki=enwiki.replace(" ", "_")))
|
||||||
flask.url_for(flask.request.endpoint, enwiki=enwiki.replace(" ", "_"))
|
|
||||||
)
|
|
||||||
if " " in enwiki
|
if " " in enwiki
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/enwiki/<path:enwiki>")
|
@app.route("/enwiki/<path:enwiki>")
|
||||||
def article_page(enwiki: str) -> Response:
|
def article_page(enwiki: str) -> Response | str:
|
||||||
"""Article Page."""
|
"""Article Page."""
|
||||||
redirect = redirect_if_needed(enwiki)
|
redirect = redirect_if_needed(enwiki)
|
||||||
if redirect:
|
if redirect:
|
||||||
|
@ -151,7 +153,8 @@ def article_page(enwiki: str) -> Response:
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/start")
|
@app.route("/oauth/start")
|
||||||
def start_oauth():
|
def start_oauth() -> Response:
|
||||||
|
"""Start OAuth."""
|
||||||
next_page = flask.request.args.get("next")
|
next_page = flask.request.args.get("next")
|
||||||
if next_page:
|
if next_page:
|
||||||
flask.session["after_login"] = next_page
|
flask.session["after_login"] = next_page
|
||||||
|
@ -174,7 +177,8 @@ def start_oauth():
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/callback", methods=["GET"])
|
@app.route("/oauth/callback", methods=["GET"])
|
||||||
def oauth_callback():
|
def oauth_callback() -> Response:
|
||||||
|
"""Autentication callback."""
|
||||||
client_key = app.config["CLIENT_KEY"]
|
client_key = app.config["CLIENT_KEY"]
|
||||||
client_secret = app.config["CLIENT_SECRET"]
|
client_secret = app.config["CLIENT_SECRET"]
|
||||||
|
|
||||||
|
@ -201,11 +205,12 @@ def oauth_callback():
|
||||||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||||
|
|
||||||
next_page = flask.session.get("after_login")
|
next_page = flask.session.get("after_login")
|
||||||
return flask.redirect(next_page) if next_page else flask.url_for("index")
|
return flask.redirect(next_page if next_page else flask.url_for("index"))
|
||||||
|
|
||||||
|
|
||||||
@app.route("/oauth/disconnect")
|
@app.route("/oauth/disconnect")
|
||||||
def oauth_disconnect():
|
def oauth_disconnect() -> Response:
|
||||||
|
"""Disconnect OAuth."""
|
||||||
for key in "owner_key", "owner_secret", "username", "after_login":
|
for key in "owner_key", "owner_secret", "username", "after_login":
|
||||||
if key in flask.session:
|
if key in flask.session:
|
||||||
del flask.session[key]
|
del flask.session[key]
|
||||||
|
|
Loading…
Reference in a new issue