376 lines
11 KiB
Python
Executable file
376 lines
11 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import html
|
|
import itertools
|
|
import json
|
|
import re
|
|
import typing
|
|
|
|
import flask
|
|
import werkzeug
|
|
from requests_oauthlib import OAuth1Session
|
|
from werkzeug.wrappers.response import Response
|
|
|
|
from add_links import api, core, mediawiki_api, wikidata_oauth
|
|
from add_links.match import NoMatch, get_diff, get_match
|
|
|
|
app = flask.Flask(__name__)
|
|
app.config.from_object("config.default")
|
|
app.debug = True
|
|
|
|
wiki_hostname = "en.wikipedia.org"
|
|
wiki_api_php = f"https://{wiki_hostname}/w/api.php"
|
|
wiki_index_php = f"https://{wiki_hostname}/w/index.php"
|
|
|
|
|
|
class Hit(typing.TypedDict):
|
|
"""Candidate articles."""
|
|
|
|
ns: int
|
|
title: str
|
|
pageid: int
|
|
size: int
|
|
wordcount: int
|
|
snippet: str
|
|
timestamp: str
|
|
|
|
|
|
re_disambig = re.compile(r"^(.*) \((.*)\)$")
|
|
|
|
|
|
def load_examples() -> list[dict[str, str | int]]:
|
|
"""Load examples."""
|
|
return [json.loads(line) for line in open("examples")]
|
|
|
|
|
|
def article_title_to_search_query(title: str) -> str:
|
|
"""Convert from article title to search query string."""
|
|
m = re_disambig.match(title)
|
|
return f'"{m.group(1)}" AND "{m.group(2)}"' if m else f'"{title}"'
|
|
|
|
|
|
def run_search(q: str, limit: int | str = "max") -> dict[str, typing.Any]:
|
|
"""Search Wikipedia."""
|
|
params = {"list": "search", "srwhat": "text", "srlimit": limit, "srsearch": q}
|
|
return typing.cast(dict[str, typing.Any], api.api_get(params)["query"])
|
|
|
|
|
|
def article_url(title: str) -> str:
|
|
"""URL for search page."""
|
|
return flask.url_for("article_page", url_title=title.replace(" ", "_"))
|
|
|
|
|
|
def search_count(q: str) -> int:
|
|
"""How often does this article title appear in Wikipedia."""
|
|
query = run_search(article_title_to_search_query(q), limit=0)
|
|
return typing.cast(int, query["searchinfo"]["totalhits"]) - 1
|
|
|
|
|
|
def search_count_with_link(q: str) -> int:
|
|
"""How often does this article title appear in Wikipedia."""
|
|
query = run_search(article_title_to_search_query(q) + f' linksto:"{q}"', limit=0)
|
|
return typing.cast(int, query["searchinfo"]["totalhits"])
|
|
|
|
|
|
def search_no_link(q: str) -> tuple[int, list[Hit]]:
|
|
"""Search for mentions of article title with no link included."""
|
|
query = run_search(article_title_to_search_query(q) + f' -linksto:"{q}"', "max")
|
|
totalhits = query["searchinfo"]["totalhits"]
|
|
results = query["search"]
|
|
return (totalhits, results)
|
|
|
|
|
|
@app.before_request
|
|
def global_user() -> None:
|
|
"""Make username available everywhere."""
|
|
flask.g.user = wikidata_oauth.get_username()
|
|
|
|
|
|
@app.route("/")
|
|
def index() -> str | Response:
|
|
"""Index page."""
|
|
if "oauth_verifier" in flask.request.args and "oauth_token" in flask.request.args:
|
|
url = flask.url_for("oauth_callback", **flask.request.args) # type: ignore
|
|
return flask.redirect(url)
|
|
|
|
examples = load_examples()
|
|
examples.sort(
|
|
key=lambda i: float(i["with_links"]) / float(i["total"]), reverse=True
|
|
)
|
|
|
|
if q := flask.request.args.get("q"):
|
|
if q_trimmed := q.strip():
|
|
return flask.redirect(article_url(q_trimmed))
|
|
|
|
return flask.render_template(
|
|
"index.html", examples=examples, article_url=article_url
|
|
)
|
|
|
|
|
|
def case_flip(s: str) -> str:
|
|
"""Switch case of character."""
|
|
if s.islower():
|
|
return s.upper()
|
|
if s.isupper():
|
|
return s.lower()
|
|
return s
|
|
|
|
|
|
def case_flip_first(s: str) -> str:
|
|
"""Switch case of first character in string."""
|
|
return case_flip(s[0]) + s[1:]
|
|
|
|
|
|
def tidy_snippet(snippet: str) -> str:
|
|
"""Remove HTML from snippet."""
|
|
snippet = snippet.replace("\u2013", "-")
|
|
snippet = snippet.replace("</span>", "")
|
|
snippet = snippet.replace('<span class="searchmatch">', "")
|
|
return html.unescape(snippet)
|
|
|
|
|
|
@app.route("/oauth/start")
|
|
def start_oauth() -> Response:
|
|
"""Start OAuth."""
|
|
next_page = flask.request.args.get("next")
|
|
if next_page:
|
|
flask.session["after_login"] = next_page
|
|
|
|
client_key = app.config["CLIENT_KEY"]
|
|
client_secret = app.config["CLIENT_SECRET"]
|
|
request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate"
|
|
|
|
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
|
|
fetch_response = oauth.fetch_request_token(request_token_url)
|
|
|
|
flask.session["owner_key"] = fetch_response.get("oauth_token")
|
|
flask.session["owner_secret"] = fetch_response.get("oauth_token_secret")
|
|
|
|
assert flask.session["owner_key"] and flask.session["owner_secret"]
|
|
|
|
base_authorization_url = f"https://{wiki_hostname}/wiki/Special:OAuth/authorize"
|
|
authorization_url = oauth.authorization_url(
|
|
base_authorization_url, oauth_consumer_key=client_key
|
|
)
|
|
return flask.redirect(authorization_url)
|
|
|
|
|
|
@app.route("/oauth/callback", methods=["GET"])
|
|
def oauth_callback() -> werkzeug.wrappers.response.Response:
|
|
"""Oauth callback."""
|
|
client_key = app.config["CLIENT_KEY"]
|
|
client_secret = app.config["CLIENT_SECRET"]
|
|
|
|
oauth = OAuth1Session(
|
|
client_key,
|
|
client_secret=client_secret,
|
|
resource_owner_key=flask.session.get("owner_key"),
|
|
resource_owner_secret=flask.session.get("owner_secret"),
|
|
)
|
|
|
|
oauth_response = oauth.parse_authorization_response(flask.request.url)
|
|
verifier = oauth_response.get("oauth_verifier")
|
|
access_token_url = wiki_index_php + "?title=Special%3aOAuth%2ftoken"
|
|
oauth = OAuth1Session(
|
|
client_key,
|
|
client_secret=client_secret,
|
|
resource_owner_key=flask.session["owner_key"],
|
|
resource_owner_secret=flask.session["owner_secret"],
|
|
verifier=verifier,
|
|
)
|
|
|
|
oauth_tokens = oauth.fetch_access_token(access_token_url)
|
|
flask.session["owner_key"] = oauth_tokens.get("oauth_token")
|
|
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
|
|
|
print("login successful")
|
|
|
|
next_page = flask.session.get("after_login")
|
|
return flask.redirect(next_page if next_page else flask.url_for("index"))
|
|
|
|
|
|
@app.route("/oauth/disconnect")
|
|
def oauth_disconnect() -> werkzeug.wrappers.response.Response:
|
|
"""Disconnect OAuth."""
|
|
for key in "owner_key", "owner_secret", "username", "after_login":
|
|
if key in flask.session:
|
|
del flask.session[key]
|
|
return flask.redirect(flask.url_for("index"))
|
|
|
|
|
|
def match_type(q: str, snippet: str) -> str | None:
|
|
"""Discover match type, ''exact', 'case_mismatch' or None.
|
|
|
|
>>> match_type('foo', 'foo')
|
|
'exact'
|
|
>>> match_type('foo', 'bar') is None
|
|
True
|
|
>>> match_type('bar', 'foo bar baz')
|
|
'exact'
|
|
>>> match_type('clean coal technology', 'foo clean coal technologies baz')
|
|
'exact'
|
|
>>> match_type('bar', 'foo Bar baz')
|
|
'exact'
|
|
>>> match_type('bar', 'foo BAR baz')
|
|
'case_mismatch'
|
|
>>> match_type('foo-bar', 'aa foo-bar cc')
|
|
'exact'
|
|
>>> match_type(u'foo\u2013bar', 'aa foo-bar cc')
|
|
'exact'
|
|
"""
|
|
q = q.replace("\u2013", "-")
|
|
snippet = tidy_snippet(snippet)
|
|
|
|
if q in snippet or case_flip_first(q) in snippet:
|
|
return "exact"
|
|
match = None
|
|
if q.lower() in snippet.lower():
|
|
match = "case_mismatch"
|
|
if match != "exact" and q.endswith("y"):
|
|
if q[:-1] in snippet or case_flip_first(q[:-1]) in snippet:
|
|
return "exact"
|
|
elif match is None:
|
|
if q[:-1].lower() in snippet.lower():
|
|
match = "case_mismatch"
|
|
return match
|
|
|
|
|
|
class NoGoodHit(Exception):
|
|
"""No good hit."""
|
|
|
|
|
|
def get_best_hit(title: str, hits: list[Hit]) -> tuple[Hit, dict[str, typing.Any]]:
|
|
"""Find the best hit within the search results."""
|
|
for hit in hits:
|
|
if hit["title"].lower() == title.lower():
|
|
continue
|
|
if match_type(title, hit["snippet"]) != "exact":
|
|
continue
|
|
|
|
try:
|
|
print(f'get diff: {hit["title"]}, {title}')
|
|
found = get_diff(title, hit["title"], None)
|
|
except NoMatch:
|
|
print("no match")
|
|
continue
|
|
|
|
return (hit, found)
|
|
|
|
raise NoGoodHit
|
|
|
|
|
|
@app.route("/link/<path:url_title>", methods=["GET", "POST"])
|
|
def article_page(url_title: str) -> str | Response:
|
|
"""Article page."""
|
|
from_title = url_title.replace("_", " ").strip()
|
|
|
|
if flask.request.method == "POST":
|
|
hit_title = flask.request.form["hit"]
|
|
do_save(from_title, hit_title)
|
|
return flask.redirect(
|
|
flask.url_for("article_page", url_title=url_title, after=hit_title)
|
|
)
|
|
|
|
article_title = flask.request.args.get("title")
|
|
|
|
total = search_count(from_title)
|
|
with_link = search_count_with_link(from_title)
|
|
|
|
no_link_count, hits = search_no_link(from_title)
|
|
|
|
by_title = {hit["title"]: hit for hit in hits}
|
|
|
|
if article_title in by_title:
|
|
hit = by_title[article_title]
|
|
found = get_diff(from_title, hit["title"], None)
|
|
else:
|
|
after = flask.request.args.get("after")
|
|
if after:
|
|
print(after)
|
|
hits_iter = itertools.dropwhile(lambda hit: hit["title"] != after, hits)
|
|
skip = next(hits_iter, None)
|
|
if skip:
|
|
hits = list(hits_iter)
|
|
|
|
try:
|
|
hit, found = get_best_hit(from_title, hits)
|
|
except NoGoodHit:
|
|
return flask.render_template("all_done.html")
|
|
|
|
return flask.render_template(
|
|
"article2.html",
|
|
title=from_title,
|
|
total=total,
|
|
with_link=with_link,
|
|
hit_title=hit["title"],
|
|
hits=hits,
|
|
replacement=found["replacement"],
|
|
diff=found["diff"],
|
|
found=found,
|
|
url_title=url_title,
|
|
)
|
|
|
|
|
|
def do_save(title: str, hit_title: str) -> str:
|
|
"""Update page on Wikipedia."""
|
|
token = wikidata_oauth.get_token()
|
|
|
|
found = get_match(title, hit_title, None)
|
|
|
|
summary = (
|
|
f"link [[{found['replacement']}]] using [[:en:User:Edward/Find link|Find link]]"
|
|
)
|
|
|
|
edit = mediawiki_api.edit_page(
|
|
pageid=found["pageid"],
|
|
section=found["section_num"],
|
|
text=found["section_text"],
|
|
summary=summary,
|
|
baserevid=found["revid"],
|
|
token=token,
|
|
)
|
|
|
|
return edit
|
|
|
|
|
|
@app.route("/saved")
|
|
def save_done() -> str:
|
|
"""Save complete."""
|
|
return flask.render_template("save_done.html")
|
|
|
|
|
|
@app.route("/api/1/hits")
|
|
def api_hits() -> werkzeug.wrappers.response.Response:
|
|
"""Return canidates for the given article title."""
|
|
title = flask.request.args.get("title")
|
|
assert title
|
|
ret = core.do_search(title)
|
|
return flask.jsonify(title=title, hits=ret["results"])
|
|
|
|
# mock_hits: list[Hit] = json.load(open("sample.json"))
|
|
# return flask.jsonify(title=title, hits=mock_hits)
|
|
|
|
|
|
@app.route("/api/1/valid_hit")
|
|
def api_valid_hit() -> werkzeug.wrappers.response.Response:
|
|
"""Return canidates for the given article title."""
|
|
link_from = flask.request.args["link_from"]
|
|
link_to = flask.request.args["link_to"]
|
|
|
|
try:
|
|
diff, replacement = get_diff(link_to, link_from, None)
|
|
except NoMatch:
|
|
return flask.jsonify(valid=False)
|
|
|
|
return flask.jsonify(valid=True, diff=diff, replacement=replacement)
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon() -> Response:
|
|
"""No favicon."""
|
|
return flask.Response(status=404)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8000)
|