#!/usr/bin/python3 import html import itertools import json import re import sys import typing import flask import werkzeug from requests_oauthlib import OAuth1Session from requests_oauthlib.oauth1_session import TokenRequestDenied from werkzeug.wrappers.response import Response from add_links import api, core, mediawiki_api, mediawiki_oauth from add_links.match import NoMatch, get_diff, get_match app = flask.Flask(__name__) app.config.from_object("config.default") app.debug = True wiki_hostname = "en.wikipedia.org" wiki_api_php = f"https://{wiki_hostname}/w/api.php" wiki_index_php = f"https://{wiki_hostname}/w/index.php" class Hit(typing.TypedDict): """Candidate articles.""" ns: int title: str pageid: int size: int wordcount: int snippet: str timestamp: str def load_examples() -> list[dict[str, str | int]]: """Load examples.""" return [json.loads(line) for line in open("examples")] def article_title_to_search_query(title: str) -> str: """ Convert a Wikipedia article title to a search query string. This function takes a Wikipedia article title and parses it to create a search query. If the title contains disambiguation text in parentheses, it separates the main title and the disambiguation text and formats them into a search query using an AND operator. If there's no disambiguation text, the title is used as is for the search query. The search query is formatted such that the main title and the disambiguation text (if present) are enclosed in double quotes and connected with 'AND'. This format is useful for precise search engine queries. Args: title (str): The Wikipedia article title, possibly including disambiguation text in parentheses. Returns: str: A formatted search query string. If disambiguation text is present, returns '"[main title]" AND "[disambiguation text]"'. Otherwise, returns '"[title]"'. Example: >>> article_title_to_search_query("Python (programming language)") '"Python" AND "programming language"' >>> article_title_to_search_query("London") '"London"' """ m = re.match(r"^(.*) \((.*)\)$", title) return f'"{m.group(1)}" AND "{m.group(2)}"' if m else f'"{title}"' def run_search(q: str, limit: int | str = "max") -> dict[str, typing.Any]: """Search Wikipedia.""" params = {"list": "search", "srwhat": "text", "srlimit": limit, "srsearch": q} return typing.cast(dict[str, typing.Any], api.api_get(params)["query"]) def article_url(title: str) -> str: """URL for search page.""" return flask.url_for("article_page", url_title=title.replace(" ", "_")) def get_hit_count(q: str) -> int: """Search Wikipedia and return hit count.""" return typing.cast(int, run_search(q, limit=0)["searchinfo"]["totalhits"]) def search_count(q: str) -> int: """How often does this article title appear in Wikipedia.""" return get_hit_count(article_title_to_search_query(q)) - 1 def search_count_with_link(q: str, redirect_to: str | None = None) -> int: """Articles in Wikipedia that include this search term and a link.""" count = get_hit_count(article_title_to_search_query(q) + f' linksto:"{q}"') if redirect_to: count += get_hit_count(article_title_to_search_query(q) + f' linksto:"{redirect_to}"') return count def search_no_link(q: str, redirect_to: str | None = None) -> tuple[int, list[Hit]]: """Search for mentions of article title with no link included.""" exclude = f' -linksto:"{q}"' if redirect_to: exclude += f' -linksto:"{redirect_to}"' query = run_search(article_title_to_search_query(q) + exclude, "max") return (query["searchinfo"]["totalhits"], query["search"]) @app.before_request def global_user() -> None: """Make username available everywhere.""" flask.g.user = mediawiki_oauth.get_username() flask.g.oauth_session = mediawiki_oauth.get_oauth_session() @app.route("/") def index() -> str | Response: """Index page.""" if "oauth_verifier" in flask.request.args and "oauth_token" in flask.request.args: url = flask.url_for("oauth_callback", **flask.request.args) # type: ignore return flask.redirect(url) if q := flask.request.args.get("q"): if q_trimmed := q.strip(): return flask.redirect(article_url(q_trimmed)) debug = flask.request.args.get("debug") examples: list[dict[str, str | int]] = [] if debug: examples = load_examples() examples.sort( key=lambda i: float(i["with_links"]) / float(i["total"]), reverse=True ) return flask.render_template( "index.html", examples=examples, article_url=article_url, debug=debug ) def case_flip(s: str) -> str: """ Switch the case of a single character. If the character is lowercase, it is converted to uppercase. If it is uppercase, it is converted to lowercase. Non-alphabetic characters remain unchanged. Args: s (str): A single character string. Returns: str: The character with its case flipped, or the original character if it's not a letter. Example: >>> case_flip('a') 'A' >>> case_flip('A') 'a' >>> case_flip('1') '1' """ if s.islower(): return s.upper() if s.isupper(): return s.lower() return s def case_flip_first(s: str) -> str: """Switch case of first character in string.""" return case_flip(s[0]) + s[1:] def tidy_snippet(snippet: str) -> str: """Remove HTML from snippet.""" snippet = snippet.replace("\u2013", "-") snippet = snippet.replace("", "") snippet = snippet.replace('', "") return html.unescape(snippet) @app.route("/oauth/start") def start_oauth() -> Response: """Start OAuth.""" next_page = flask.request.args.get("next") if next_page: flask.session["after_login"] = next_page client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate" oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob") oauth.headers.update({"User-Agent": api.ua}) try: fetch_response = oauth.fetch_request_token(request_token_url) except TokenRequestDenied as e: return flask.make_response( flask.render_template("error.html", message=str(e)), 502 ) flask.session["owner_key"] = fetch_response.get("oauth_token") flask.session["owner_secret"] = fetch_response.get("oauth_token_secret") assert flask.session["owner_key"] and flask.session["owner_secret"] base_authorization_url = f"https://{wiki_hostname}/wiki/Special:OAuth/authorize" authorization_url = oauth.authorization_url( base_authorization_url, oauth_consumer_key=client_key ) return flask.redirect(authorization_url) @app.route("/oauth/callback", methods=["GET"]) def oauth_callback() -> werkzeug.wrappers.response.Response: """Oauth callback.""" client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] oauth = OAuth1Session( client_key, client_secret=client_secret, resource_owner_key=flask.session.get("owner_key"), resource_owner_secret=flask.session.get("owner_secret"), ) oauth_response = oauth.parse_authorization_response(flask.request.url) verifier = oauth_response.get("oauth_verifier") access_token_url = wiki_index_php + "?title=Special%3aOAuth%2ftoken" oauth = OAuth1Session( client_key, client_secret=client_secret, resource_owner_key=flask.session["owner_key"], resource_owner_secret=flask.session["owner_secret"], verifier=verifier, ) oauth.headers.update({"User-Agent": api.ua}) oauth_tokens = oauth.fetch_access_token(access_token_url) flask.session["owner_key"] = oauth_tokens.get("oauth_token") flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret") username = mediawiki_oauth.get_username() print(f"login successful: {username}", file=sys.stderr) next_page = flask.session.get("after_login") return flask.redirect(next_page if next_page else flask.url_for("index")) @app.route("/oauth/disconnect") def oauth_disconnect() -> werkzeug.wrappers.response.Response: """Disconnect OAuth.""" for key in "owner_key", "owner_secret", "username", "after_login": if key in flask.session: del flask.session[key] return flask.redirect(flask.url_for("index")) def match_type(q: str, snippet: str) -> str | None: """Discover match type, ''exact', 'case_mismatch' or None. >>> match_type('foo', 'foo') 'exact' >>> match_type('foo', 'bar') is None True >>> match_type('bar', 'foo bar baz') 'exact' >>> match_type('clean coal technology', 'foo clean coal technologies baz') 'exact' >>> match_type('bar', 'foo Bar baz') 'exact' >>> match_type('bar', 'foo BAR baz') 'case_mismatch' >>> match_type('foo-bar', 'aa foo-bar cc') 'exact' >>> match_type(u'foo\u2013bar', 'aa foo-bar cc') 'exact' """ q = q.replace("\u2013", "-") snippet = tidy_snippet(snippet) if q in snippet or case_flip_first(q) in snippet: return "exact" match = None if q.lower() in snippet.lower(): match = "case_mismatch" if match != "exact" and q.endswith("y"): if q[:-1] in snippet or case_flip_first(q[:-1]) in snippet: return "exact" elif match is None: if q[:-1].lower() in snippet.lower(): match = "case_mismatch" return match def _record_skip(from_title: str, hit_title: str) -> None: """Record that a candidate was skipped or saved for this article.""" skipped: dict[str, list[str]] = flask.session.get("skipped", {}) article_skipped = skipped.get(from_title, []) if hit_title not in article_skipped: skipped[from_title] = article_skipped + [hit_title] flask.session["skipped"] = skipped flask.session.modified = True def handle_post(url_title: str) -> Response: """Handle POST request.""" from_title = url_title.replace("_", " ").strip() hit_title = flask.request.form["hit"] try: do_save(from_title, hit_title) except mediawiki_oauth.LoginNeeded: return flask.redirect(flask.url_for("start_oauth")) except (mediawiki_api.APIError, api.MediawikiError) as e: return flask.make_response( flask.render_template("error.html", message=f"Save failed: {e}"), 502 ) flask.session["saves"] = flask.session.get("saves", 0) + 1 saves_by_title: dict[str, int] = flask.session.get("saves_by_title", {}) saves_by_title[from_title] = saves_by_title.get(from_title, 0) + 1 flask.session["saves_by_title"] = saves_by_title flask.session.modified = True _record_skip(from_title, hit_title) return flask.redirect( flask.url_for("article_page", url_title=url_title, after=hit_title) ) @app.route("/link/", methods=["GET", "POST"]) def article_page(url_title: str) -> str | Response: """Article page.""" if flask.request.method == "POST": return handle_post(url_title) from_title = url_title.replace("_", " ").strip() try: redirect_to = api.get_wiki_info(from_title) except (api.MissingPage, api.MultipleRedirects, api.MediawikiError): redirect_to = None try: total = search_count(from_title) with_link = search_count_with_link(from_title, redirect_to) _no_link_count, hits = search_no_link(from_title, redirect_to) except api.MediawikiError as e: return flask.make_response( flask.render_template("error.html", message=str(e)), 502 ) # Filter out candidates already processed this session session_skipped: set[str] = set( flask.session.get("skipped", {}).get(from_title, []) ) # If a specific candidate was requested, move it to the front title_param = flask.request.args.get("title") if title_param: hits = [h for h in hits if h["title"] == title_param] + \ [h for h in hits if h["title"] != title_param] # Record and apply explicit skip-past after = flask.request.args.get("after") if after: _record_skip(from_title, after) session_skipped.add(after) hits = [h for h in hits if h["title"] not in session_skipped and h["title"] != from_title and h["title"] != case_flip_first(from_title)] if not hits: return flask.render_template("all_done.html") saves_this_session = flask.session.get("saves_by_title", {}).get(from_title, 0) return flask.render_template( "article.html", title=from_title, redirect_to=redirect_to, total=total, with_link=with_link, hits=hits, url_title=url_title, saves_this_session=saves_this_session, ) def do_save(title: str, hit_title: str) -> str: """Update page on Wikipedia.""" token = mediawiki_oauth.get_token() try: redirect_to = api.get_wiki_info(title) except (api.MissingPage, api.MultipleRedirects, api.MediawikiError): redirect_to = None found = get_match(title, hit_title, redirect_to) summary = ( f"link [[{found['replacement']}]] using [[:en:User:Edward/Find link|Find link]]" ) edit = mediawiki_api.edit_page( pageid=found["pageid"], section=found["section_num"], text=found["section_text"], summary=summary, baserevid=found["revid"], token=token, ) return edit @app.route("/saved") def save_done() -> str: """Save complete.""" return flask.render_template("save_done.html") @app.route("/api/1/hits") def api_hits() -> werkzeug.wrappers.response.Response: """Return candidates for the given article title.""" title = flask.request.args.get("title") assert title ret = core.do_search(title) return flask.jsonify(title=title, hits=ret["results"]) # mock_hits: list[Hit] = json.load(open("sample.json")) # return flask.jsonify(title=title, hits=mock_hits) @app.route("/api/1/valid_hit") def api_valid_hit() -> werkzeug.wrappers.response.Response: """Check if a candidate article has a valid unlinked mention.""" link_to = flask.request.args["link_to"] link_from = flask.request.args["link_from"] redirect_to = flask.request.args.get("redirect_to") or None try: found = get_diff(link_to, link_from, redirect_to) except NoMatch: _record_skip(link_to, link_from) return flask.jsonify(valid=False) except api.MediawikiError as e: return flask.jsonify(valid=False, error=str(e)) return flask.jsonify(valid=True, diff=found["diff"], replacement=found["replacement"]) @app.route("/favicon.ico") def favicon() -> Response: """No favicon.""" return flask.Response(status=404) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000)