#!/usr/bin/python3
import html
import itertools
import json
import re
import sys
import typing
import flask
import werkzeug
from requests_oauthlib import OAuth1Session
from requests_oauthlib.oauth1_session import TokenRequestDenied
from werkzeug.wrappers.response import Response
from add_links import api, core, mediawiki_api, mediawiki_oauth
from add_links.match import NoMatch, get_diff, get_match
app = flask.Flask(__name__)
app.config.from_object("config.default")
app.debug = True
wiki_hostname = "en.wikipedia.org"
wiki_api_php = f"https://{wiki_hostname}/w/api.php"
wiki_index_php = f"https://{wiki_hostname}/w/index.php"
class Hit(typing.TypedDict):
"""Candidate articles."""
ns: int
title: str
pageid: int
size: int
wordcount: int
snippet: str
timestamp: str
def load_examples() -> list[dict[str, str | int]]:
"""Load examples."""
return [json.loads(line) for line in open("examples")]
def article_title_to_search_query(title: str) -> str:
"""
Convert a Wikipedia article title to a search query string.
This function takes a Wikipedia article title and parses it to create a
search query. If the title contains disambiguation text in parentheses,
it separates the main title and the disambiguation text and formats them
into a search query using an AND operator. If there's no disambiguation text,
the title is used as is for the search query.
The search query is formatted such that the main title and the disambiguation
text (if present) are enclosed in double quotes and connected with 'AND'.
This format is useful for precise search engine queries.
Args:
title (str): The Wikipedia article title, possibly including disambiguation
text in parentheses.
Returns:
str: A formatted search query string. If disambiguation text is present,
returns '"[main title]" AND "[disambiguation text]"'. Otherwise,
returns '"[title]"'.
Example:
>>> article_title_to_search_query("Python (programming language)")
'"Python" AND "programming language"'
>>> article_title_to_search_query("London")
'"London"'
"""
m = re.match(r"^(.*) \((.*)\)$", title)
return f'"{m.group(1)}" AND "{m.group(2)}"' if m else f'"{title}"'
def run_search(q: str, limit: int | str = "max") -> dict[str, typing.Any]:
"""Search Wikipedia."""
params = {"list": "search", "srwhat": "text", "srlimit": limit, "srsearch": q}
return typing.cast(dict[str, typing.Any], api.api_get(params)["query"])
def article_url(title: str) -> str:
"""URL for search page."""
return flask.url_for("article_page", url_title=title.replace(" ", "_"))
def get_hit_count(q: str) -> int:
"""Search Wikipedia and return hit count."""
return typing.cast(int, run_search(q, limit=0)["searchinfo"]["totalhits"])
def search_count(q: str) -> int:
"""How often does this article title appear in Wikipedia."""
return get_hit_count(article_title_to_search_query(q)) - 1
def search_count_with_link(q: str) -> int:
"""Articles in Wikipedia that include this search term and a link."""
return get_hit_count(article_title_to_search_query(q) + f' linksto:"{q}"')
def search_no_link(q: str) -> tuple[int, list[Hit]]:
"""Search for mentions of article title with no link included."""
query = run_search(article_title_to_search_query(q) + f' -linksto:"{q}"', "max")
return (query["searchinfo"]["totalhits"], query["search"])
@app.before_request
def global_user() -> None:
"""Make username available everywhere."""
flask.g.user = mediawiki_oauth.get_username()
flask.g.oauth_session = mediawiki_oauth.get_oauth_session()
@app.route("/")
def index() -> str | Response:
"""Index page."""
if "oauth_verifier" in flask.request.args and "oauth_token" in flask.request.args:
url = flask.url_for("oauth_callback", **flask.request.args) # type: ignore
return flask.redirect(url)
if q := flask.request.args.get("q"):
if q_trimmed := q.strip():
return flask.redirect(article_url(q_trimmed))
debug = flask.request.args.get("debug")
examples: list[dict[str, str | int]] = []
if debug:
examples = load_examples()
examples.sort(
key=lambda i: float(i["with_links"]) / float(i["total"]), reverse=True
)
return flask.render_template(
"index.html", examples=examples, article_url=article_url, debug=debug
)
def case_flip(s: str) -> str:
"""
Switch the case of a single character.
If the character is lowercase, it is converted to uppercase. If it is uppercase,
it is converted to lowercase. Non-alphabetic characters remain unchanged.
Args:
s (str): A single character string.
Returns:
str: The character with its case flipped, or the original character if it's
not a letter.
Example:
>>> case_flip('a')
'A'
>>> case_flip('A')
'a'
>>> case_flip('1')
'1'
"""
if s.islower():
return s.upper()
if s.isupper():
return s.lower()
return s
def case_flip_first(s: str) -> str:
"""Switch case of first character in string."""
return case_flip(s[0]) + s[1:]
def tidy_snippet(snippet: str) -> str:
"""Remove HTML from snippet."""
snippet = snippet.replace("\u2013", "-")
snippet = snippet.replace("", "")
snippet = snippet.replace('', "")
return html.unescape(snippet)
@app.route("/oauth/start")
def start_oauth() -> Response:
"""Start OAuth."""
next_page = flask.request.args.get("next")
if next_page:
flask.session["after_login"] = next_page
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate"
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
oauth.headers.update({"User-Agent": api.ua})
try:
fetch_response = oauth.fetch_request_token(request_token_url)
except TokenRequestDenied as e:
return flask.make_response(
flask.render_template("error.html", message=str(e)), 502
)
flask.session["owner_key"] = fetch_response.get("oauth_token")
flask.session["owner_secret"] = fetch_response.get("oauth_token_secret")
assert flask.session["owner_key"] and flask.session["owner_secret"]
base_authorization_url = f"https://{wiki_hostname}/wiki/Special:OAuth/authorize"
authorization_url = oauth.authorization_url(
base_authorization_url, oauth_consumer_key=client_key
)
return flask.redirect(authorization_url)
@app.route("/oauth/callback", methods=["GET"])
def oauth_callback() -> werkzeug.wrappers.response.Response:
"""Oauth callback."""
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=flask.session.get("owner_key"),
resource_owner_secret=flask.session.get("owner_secret"),
)
oauth_response = oauth.parse_authorization_response(flask.request.url)
verifier = oauth_response.get("oauth_verifier")
access_token_url = wiki_index_php + "?title=Special%3aOAuth%2ftoken"
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=flask.session["owner_key"],
resource_owner_secret=flask.session["owner_secret"],
verifier=verifier,
)
oauth.headers.update({"User-Agent": api.ua})
oauth_tokens = oauth.fetch_access_token(access_token_url)
flask.session["owner_key"] = oauth_tokens.get("oauth_token")
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
username = mediawiki_oauth.get_username()
print(f"login successful: {username}", file=sys.stderr)
next_page = flask.session.get("after_login")
return flask.redirect(next_page if next_page else flask.url_for("index"))
@app.route("/oauth/disconnect")
def oauth_disconnect() -> werkzeug.wrappers.response.Response:
"""Disconnect OAuth."""
for key in "owner_key", "owner_secret", "username", "after_login":
if key in flask.session:
del flask.session[key]
return flask.redirect(flask.url_for("index"))
def match_type(q: str, snippet: str) -> str | None:
"""Discover match type, ''exact', 'case_mismatch' or None.
>>> match_type('foo', 'foo')
'exact'
>>> match_type('foo', 'bar') is None
True
>>> match_type('bar', 'foo bar baz')
'exact'
>>> match_type('clean coal technology', 'foo clean coal technologies baz')
'exact'
>>> match_type('bar', 'foo Bar baz')
'exact'
>>> match_type('bar', 'foo BAR baz')
'case_mismatch'
>>> match_type('foo-bar', 'aa foo-bar cc')
'exact'
>>> match_type(u'foo\u2013bar', 'aa foo-bar cc')
'exact'
"""
q = q.replace("\u2013", "-")
snippet = tidy_snippet(snippet)
if q in snippet or case_flip_first(q) in snippet:
return "exact"
match = None
if q.lower() in snippet.lower():
match = "case_mismatch"
if match != "exact" and q.endswith("y"):
if q[:-1] in snippet or case_flip_first(q[:-1]) in snippet:
return "exact"
elif match is None:
if q[:-1].lower() in snippet.lower():
match = "case_mismatch"
return match
def _record_skip(from_title: str, hit_title: str) -> None:
"""Record that a candidate was skipped or saved for this article."""
skipped: dict[str, list[str]] = flask.session.get("skipped", {})
article_skipped = skipped.get(from_title, [])
if hit_title not in article_skipped:
skipped[from_title] = article_skipped + [hit_title]
flask.session["skipped"] = skipped
flask.session.modified = True
def handle_post(url_title: str) -> Response:
"""Handle POST request."""
from_title = url_title.replace("_", " ").strip()
hit_title = flask.request.form["hit"]
try:
do_save(from_title, hit_title)
except mediawiki_oauth.LoginNeeded:
return flask.redirect(flask.url_for("start_oauth"))
except mediawiki_api.APIError as e:
return flask.make_response(f"Save failed: {e}", 502)
flask.session["saves"] = flask.session.get("saves", 0) + 1
_record_skip(from_title, hit_title)
return flask.redirect(
flask.url_for("article_page", url_title=url_title, after=hit_title)
)
@app.route("/link/", methods=["GET", "POST"])
def article_page(url_title: str) -> str | Response:
"""Article page."""
if flask.request.method == "POST":
return handle_post(url_title)
from_title = url_title.replace("_", " ").strip()
try:
total = search_count(from_title)
with_link = search_count_with_link(from_title)
_no_link_count, hits = search_no_link(from_title)
except api.MediawikiError as e:
return flask.make_response(
flask.render_template("error.html", message=str(e)), 502
)
# Filter out candidates already processed this session
session_skipped: set[str] = set(
flask.session.get("skipped", {}).get(from_title, [])
)
# If a specific candidate was requested, move it to the front
title_param = flask.request.args.get("title")
if title_param:
hits = [h for h in hits if h["title"] == title_param] + \
[h for h in hits if h["title"] != title_param]
# Record and apply explicit skip-past
after = flask.request.args.get("after")
if after:
_record_skip(from_title, after)
session_skipped.add(after)
hits = [h for h in hits if h["title"] not in session_skipped
and h["title"] != from_title and h["title"] != case_flip_first(from_title)]
if not hits:
return flask.render_template("all_done.html")
return flask.render_template(
"article.html",
title=from_title,
total=total,
with_link=with_link,
hits=hits,
url_title=url_title,
)
def do_save(title: str, hit_title: str) -> str:
"""Update page on Wikipedia."""
token = mediawiki_oauth.get_token()
found = get_match(title, hit_title, None)
summary = (
f"link [[{found['replacement']}]] using [[:en:User:Edward/Find link|Find link]]"
)
edit = mediawiki_api.edit_page(
pageid=found["pageid"],
section=found["section_num"],
text=found["section_text"],
summary=summary,
baserevid=found["revid"],
token=token,
)
return edit
@app.route("/saved")
def save_done() -> str:
"""Save complete."""
return flask.render_template("save_done.html")
@app.route("/api/1/hits")
def api_hits() -> werkzeug.wrappers.response.Response:
"""Return candidates for the given article title."""
title = flask.request.args.get("title")
assert title
ret = core.do_search(title)
return flask.jsonify(title=title, hits=ret["results"])
# mock_hits: list[Hit] = json.load(open("sample.json"))
# return flask.jsonify(title=title, hits=mock_hits)
@app.route("/api/1/valid_hit")
def api_valid_hit() -> werkzeug.wrappers.response.Response:
"""Check if a candidate article has a valid unlinked mention."""
link_to = flask.request.args["link_to"]
link_from = flask.request.args["link_from"]
try:
found = get_diff(link_to, link_from, None)
except NoMatch:
_record_skip(link_to, link_from)
return flask.jsonify(valid=False)
except api.MediawikiError as e:
return flask.jsonify(valid=False, error=str(e))
return flask.jsonify(valid=True, diff=found["diff"], replacement=found["replacement"])
@app.route("/favicon.ico")
def favicon() -> Response:
"""No favicon."""
return flask.Response(status=404)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)