From f07b407e7a2cf772ebbc97f4b73c90e59011de7d Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Wed, 4 Oct 2023 12:56:21 +0100 Subject: [PATCH] Initial commit --- .gitignore | 4 + add_front_end_libraries.py | 22 ++ add_links/__init__.py | 0 add_links/api.py | 284 ++++++++++++++++++++++++ add_links/core.py | 198 +++++++++++++++++ add_links/language.py | 146 +++++++++++++ add_links/match.py | 381 +++++++++++++++++++++++++++++++++ add_links/mediawiki_api.py | 101 +++++++++ add_links/mediawiki_api_old.py | 48 +++++ add_links/util.py | 115 ++++++++++ add_links/wikidata_oauth.py | 98 +++++++++ add_links/wikipedia.py | 201 +++++++++++++++++ cmdline.py | 114 ++++++++++ frontend/.eslintrc.js | 23 ++ frontend/App.vue | 55 +++++ frontend/entry.js | 7 + package.json | 18 ++ templates/all_done.html | 10 + templates/article.html | 56 +++++ templates/article2.html | 66 ++++++ templates/base.html | 22 ++ templates/index.html | 25 +++ templates/save_done.html | 10 + vite.config.js | 17 ++ web_view.py | 362 +++++++++++++++++++++++++++++++ 25 files changed, 2383 insertions(+) create mode 100644 .gitignore create mode 100755 add_front_end_libraries.py create mode 100644 add_links/__init__.py create mode 100644 add_links/api.py create mode 100644 add_links/core.py create mode 100644 add_links/language.py create mode 100644 add_links/match.py create mode 100644 add_links/mediawiki_api.py create mode 100644 add_links/mediawiki_api_old.py create mode 100644 add_links/util.py create mode 100644 add_links/wikidata_oauth.py create mode 100644 add_links/wikipedia.py create mode 100755 cmdline.py create mode 100644 frontend/.eslintrc.js create mode 100644 frontend/App.vue create mode 100644 frontend/entry.js create mode 100644 package.json create mode 100644 templates/all_done.html create mode 100644 templates/article.html create mode 100644 templates/article2.html create mode 100644 templates/base.html create mode 100644 templates/index.html create mode 100644 templates/save_done.html create mode 100644 vite.config.js create mode 100755 web_view.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0dcd64f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +.mypy_cache/ +node_modules +package-lock.json diff --git a/add_front_end_libraries.py b/add_front_end_libraries.py new file mode 100755 index 0000000..199db33 --- /dev/null +++ b/add_front_end_libraries.py @@ -0,0 +1,22 @@ +#!/usr/bin/python3 + +import os +import shutil +import subprocess + +STATIC_DIR = "static" + +assert os.path.exists("package.json") and os.path.exists("node_modules") + +if not os.path.exists(STATIC_DIR): + os.mkdir(STATIC_DIR) + +shutil.copytree( + "node_modules/bootstrap/dist/", + os.path.join(STATIC_DIR, "bootstrap"), + dirs_exist_ok=True, +) + +subprocess.run(["npm", "run", "build"], check=True) + +shutil.copy("dist/add_links.es.js", "static") diff --git a/add_links/__init__.py b/add_links/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/add_links/api.py b/add_links/api.py new file mode 100644 index 0000000..d84f4d6 --- /dev/null +++ b/add_links/api.py @@ -0,0 +1,284 @@ +import re +from typing import Any + +import requests +from requests.adapters import HTTPAdapter +from simplejson.scanner import JSONDecodeError + +from .language import get_current_language +from .util import is_disambig + +ua = ( + "find-link/2.2 " + + "(https://github.com/EdwardBetts/find_link; contact: edward@4angle.com)" +) +re_disambig = re.compile(r"^(.*) \((.*)\)$") + + +def get_query_url() -> str: + """Get the wikipedia query API for the current language.""" + return f"https://{get_current_language()}.wikipedia.org/w/api.php" + + +sessions = {} + + +def get_session(): + lang = get_current_language() + if lang in sessions: + return sessions[lang] + s = requests.Session() + s.headers = {"User-Agent": ua} + s.mount("https://en.wikipedia.org", HTTPAdapter(max_retries=10)) + s.params = { + "format": "json", + "action": "query", + "formatversion": 2, + } + sessions[lang] = s + return s + + +class MediawikiError(Exception): + pass + + +class MultipleRedirects(Exception): + pass + + +class IncompleteReply(Exception): + pass + + +class MissingPage(Exception): + pass + + +def check_for_error(json_data): + if "error" in json_data: + raise MediawikiError(json_data["error"]["info"]) + + +webpage_error = ( + "Our servers are currently under maintenance or experiencing a technical problem." +) + + +def api_get(params: dict[str, Any]) -> dict[str, Any]: + """Make call to Wikipedia API.""" + s = get_session() + + r = s.get(get_query_url(), params=params) + try: + ret = r.json() + except JSONDecodeError: + if webpage_error in r.text: + raise MediawikiError(webpage_error) + else: + raise MediawikiError("unknown error") + check_for_error(ret) + return ret + + +def get_first_page(params: dict[str, str]) -> dict[str, Any]: + """Run Wikipedia API query and return the first page.""" + page = api_get(params)["query"]["pages"][0] + if page.get("missing"): + raise MissingPage + return page + + +def random_article_list(limit=50): + params = { + "list": "random", + "rnnamespace": "0", + "rnlimit": limit, + } + + return api_get(params)["query"]["random"] + + +def wiki_search(q): + m = re_disambig.match(q) + if m: + search = '"{}" AND "{}"'.format(*m.groups()) + else: + search = '"{}"'.format(q) + + params = { + "list": "search", + "srwhat": "text", + "srlimit": 50, + "srsearch": search, + "continue": "", + } + ret = api_get(params) + query = ret["query"] + totalhits = query["searchinfo"]["totalhits"] + results = query["search"] + for _ in range(10): + if "continue" not in ret: + break + params["sroffset"] = ret["continue"]["sroffset"] + ret = api_get(params) + results += ret["query"]["search"] + return (totalhits, results) + + +def get_wiki_info(q): + params = { + "prop": "info", + "redirects": "", + "titles": q, + } + ret = api_get(params)["query"] + if "interwiki" in ret: + return None + redirects = [] + if ret.get("redirects"): + redirects = ret["redirects"] + if len(redirects) != 1: + # multiple redirects, we should explain to the user that this is + # unsupported + raise MultipleRedirects + if ret["pages"][0].get("missing"): + raise MissingPage(q) + return redirects[0]["to"] if redirects else None + + +def cat_start(q: str) -> list[str]: + """Find categories that start with this prefix.""" + params = { + "list": "allpages", + "apnamespace": 14, # categories + "apfilterredir": "nonredirects", + "aplimit": 500, + "apprefix": q, + } + ret = api_get(params)["query"] + return [i["title"] for i in ret["allpages"] if i["title"] != q] + + +def all_pages(q: str) -> list[str]: + """Get all article titles with a given prefix.""" + params = { + "list": "allpages", + "apnamespace": 0, + "apfilterredir": "nonredirects", + "aplimit": 500, + "apprefix": q, + } + ret = api_get(params)["query"] + return [i["title"] for i in ret["allpages"] if i["title"] != q] + + +def categorymembers(q: str) -> list[str]: + """List of category members.""" + params = { + "list": "categorymembers", + "cmnamespace": 0, + "cmlimit": 500, + "cmtitle": q[0].upper() + q[1:], + } + ret = api_get(params)["query"] + return [i["title"] for i in ret["categorymembers"] if i["title"] != q] + + +def page_links(titles): # unused + titles = list(titles) + assert titles + params = { + "prop": "links", + "pllimit": 500, + "plnamespace": 0, + "titles": "|".join(titles), + } + ret = api_get(params)["query"] + return dict( + (doc["title"], {l["title"] for l in doc["links"]}) + for doc in ret["pages"].values() + if "links" in doc + ) + + +def find_disambig(titles: list[str]) -> list[str]: + """Find disambiguation articles in the given list of titles.""" + titles = list(titles) + assert titles + pos = 0 + disambig: list[str] = [] + params = { + "prop": "templates", + "tllimit": 500, + "tlnamespace": 10, # templates + "continue": "", + } + while pos < len(titles): + params["titles"] = "|".join(titles[pos : pos + 50]) + ret = api_get(params) + disambig.extend( + doc["title"] for doc in ret["query"]["pages"] if is_disambig(doc) + ) + for i in range(10): + if "continue" not in ret: + break + tlcontinue = ret["continue"]["tlcontinue"] + params["titles"] = "|".join(titles[pos : pos + 50]) + params["tlcontinue"] = tlcontinue + ret = api_get(params) + disambig.extend( + doc["title"] for doc in ret["query"]["pages"] if is_disambig(doc) + ) + pos += 50 + + return disambig + + +def wiki_redirects(q): # pages that link here + params = { + "list": "backlinks", + "blfilterredir": "redirects", + "bllimit": 500, + "blnamespace": 0, + "bltitle": q, + } + docs = api_get(params)["query"]["backlinks"] + assert all("redirect" in doc for doc in docs) + return (doc["title"] for doc in docs) + + +def wiki_backlink(q: str) -> tuple[set[str], set[str]]: + """Get backlinks for article.""" + params = { + "list": "backlinks", + "bllimit": 500, + "blnamespace": 0, + "bltitle": q, + "continue": "", + } + ret = api_get(params) + docs = ret["query"]["backlinks"] + while "continue" in ret: + params["blcontinue"] = ret["continue"]["blcontinue"] + ret = api_get(params) + docs += ret["query"]["backlinks"] + + articles = {doc["title"] for doc in docs if "redirect" not in doc} + redirects = {doc["title"] for doc in docs if "redirect" in doc} + return (articles, redirects) + + +def call_get_diff(title, section_num, section_text): + data = { + "prop": "revisions", + "rvprop": "timestamp", + "titles": title, + "rvsection": section_num, + "rvdifftotext": section_text.strip(), + } + + s = get_session() + ret = s.post(get_query_url(), data=data).json() + check_for_error(ret) + return ret["query"]["pages"][0]["revisions"][0]["diff"]["body"] diff --git a/add_links/core.py b/add_links/core.py new file mode 100644 index 0000000..a130598 --- /dev/null +++ b/add_links/core.py @@ -0,0 +1,198 @@ +"""Core functions.""" + +import html +import re +import typing +from pprint import pprint + +from .api import ( + MediawikiError, + all_pages, + cat_start, + categorymembers, + find_disambig, + get_first_page, + wiki_backlink, + wiki_search, +) +from .util import case_flip_first, norm + +re_redirect = re.compile(r"#REDIRECT \[\[(.)([^#]*?)(#.*)?\]\]") + + +def get_content_and_timestamp(title: str) -> tuple[str, str]: + """Get article content and timestamp of last update.""" + params = { + "prop": "revisions|info", + "rvprop": "content|timestamp", + "titles": title, + } + json_data: dict[str, typing.Any] = get_first_page(params) + if json_data.get("invalid"): + raise MediawikiError(json_data["invalidreason"]) + rev = json_data["revisions"][0] + return (rev["content"], rev["timestamp"]) + + +def get_revision_info(title: str) -> dict[str, typing.Any]: + """Get info about latest revision of article.""" + params = { + "prop": "revisions|info", + "rvprop": "content|timestamp|ids", + "titles": title, + } + json_data: dict[str, typing.Any] = get_first_page(params) + if json_data.get("invalid"): + raise MediawikiError(json_data["invalidreason"]) + revs = json_data.pop("revisions") + ret = revs[0] + ret["pageid"] = json_data["pageid"] + pprint(json_data) + return typing.cast(dict[str, typing.Any], ret) + + +def is_redirect_to(title_from: str, title_to: str) -> bool: + title_from = title_from.replace("_", " ") + params = {"prop": "info", "titles": title_from} + if "redirect" not in get_first_page(params): + return False + + params = {"prop": "revisions", "rvprop": "content", "titles": title_from} + page_text = get_first_page(params)["revisions"][0]["content"] + m = re_redirect.match(page_text) + assert m + title_to = title_to[0].upper() + title_to[1:] + return m.group(1).upper() + m.group(2) == title_to + + +def find_longer( + q: str, search: list[dict[str, typing.Any]], articles: set[str] +) -> list[str]: + """Find other articles with titles that are longer.""" + this_title = q[0].upper() + q[1:] + longer: list[str] = all_pages(this_title) + lq = q.lower() + for doc in search: + lt = doc["title"].lower() + if lq == lt or lq not in lt: + continue + articles.add(doc["title"]) + more_articles, more_redirects = wiki_backlink(doc["title"]) + articles.update(more_articles) + if doc["title"] not in longer: + longer.append(doc["title"]) + + return longer + + +def tidy_snippet(snippet: str) -> str: + """Remove HTML from snippet.""" + snippet = snippet.replace("\u2013", "-") + snippet = snippet.replace("", "") + snippet = snippet.replace('', "") + return html.unescape(snippet) + + +def match_type(q: str, snippet: str) -> str | None: + """Discover match type, ''exact', 'case_mismatch' or None. + + >>> match_type('foo', 'foo') + 'exact' + >>> match_type('foo', 'bar') is None + True + >>> match_type('bar', 'foo bar baz') + 'exact' + >>> match_type('clean coal technology', 'foo clean coal technologies baz') + 'exact' + >>> match_type('bar', 'foo Bar baz') + 'exact' + >>> match_type('bar', 'foo BAR baz') + 'case_mismatch' + >>> match_type('foo-bar', 'aa foo-bar cc') + 'exact' + >>> match_type(u'foo\u2013bar', 'aa foo-bar cc') + 'exact' + """ + q = q.replace("\u2013", "-") + snippet = tidy_snippet(snippet) + + if q in snippet or case_flip_first(q) in snippet: + return "exact" + match = None + if q.lower() in snippet.lower(): + match = "case_mismatch" + if match != "exact" and q.endswith("y"): + if q[:-1] in snippet or case_flip_first(q[:-1]) in snippet: + return "exact" + elif match is None: + if q[:-1].lower() in snippet.lower(): + match = "case_mismatch" + return match + + +def do_search( + q: str, redirect_to: str | None = None +) -> dict[str, int | list[dict[str, typing.Any]] | list[str] | None]: + this_title = q[0].upper() + q[1:] + + totalhits, search_hits = wiki_search(q) + articles, redirects = wiki_backlink(redirect_to or q) + cm = set() + start = cat_start(q) + if len(start) > 5: + start = [] # big categories take too long + for cat in set(["Category:" + this_title] + start): + cm.update(categorymembers(cat)) + + norm_q = norm(q) + norm_match_redirect = {r for r in redirects if norm(r) == norm_q} + longer_redirect = {r for r in redirects if q.lower() in r.lower()} + + articles.add(this_title) + if redirect_to: + articles.add(redirect_to[0].upper() + redirect_to[1:]) + + longer_redirect = {r for r in redirects if q.lower() in r.lower()} + for r in norm_match_redirect | longer_redirect: + articles.add(r) + a2, r2 = wiki_backlink(r) + articles.update(a2) + redirects.update(r2) + + longer = find_longer(q, search_hits, articles) if len(q) > 6 else None + + search: list[dict[str, typing.Any]] = [ + doc + for doc in search_hits + if doc["title"] not in articles and doc["title"] not in cm + ] + if search: + disambig = set(find_disambig([doc["title"] for doc in search])) + search = [doc for doc in search if doc["title"] not in disambig] + # and (doc['title'] not in links or this_title not in links[doc['title']])] + for doc in search: + without_markup = ( + doc["snippet"] + .replace("", "") + .replace("", "") + .replace(" ", " ") + ) + doc["match"] = match_type(q, without_markup) + doc["snippet_without_markup"] = without_markup + return { + "totalhits": totalhits, + "results": search, + "longer": longer, + } + + +def get_case_from_content(title: str) -> str | None: + """Check article content to find the case of the article title.""" + content, timestamp = get_content_and_timestamp(title) + if title == title.lower() and title in content: + return title + start = content.lower().find("'''" + title.replace("_", " ").lower() + "'''") + if start != -1: + return content[start + 3 : start + 3 + len(title)] + + return None # article doesn't contain the title diff --git a/add_links/language.py b/add_links/language.py new file mode 100644 index 0000000..7b83eaa --- /dev/null +++ b/add_links/language.py @@ -0,0 +1,146 @@ +from flask import session, has_request_context + +langs = [ + ('af', 'Afrikaans', 'Afrikaans'), + ('als', 'Alemannisch', 'Alemannic'), + ('am', 'አማርኛ', 'Amharic'), + ('an', 'aragonés', 'Aragonese'), + ('ar', 'العربية', 'Arabic'), + ('arz', 'مصرى', 'Egyptian Arabic'), + ('ast', 'asturianu', 'Asturian'), + ('az', 'azərbaycanca', 'Azerbaijani'), + ('azb', 'تۆرکجه', 'Southern Azerbaijani'), + ('ba', 'башҡортса', 'Bashkir'), + ('bar', 'Boarisch', 'Bavarian'), + ('bat-smg', 'žemaitėška', 'Samogitian'), + ('be', 'беларуская', 'Belarusian'), + ('be-tarask', 'беларуская (тарашкевіца)', 'Belarusian (Taraškievica)'), + ('bg', 'български', 'Bulgarian'), + ('bn', 'বাংলা', 'Bengali'), + ('bpy', 'বিষ্ণুপ্রিয়া মণিপুরী', 'Bishnupriya Manipuri'), + ('br', 'brezhoneg', 'Breton'), + ('bs', 'bosanski', 'Bosnian'), + ('bug', 'ᨅᨔ ᨕᨘᨁᨗ', 'Buginese'), + ('ca', 'català', 'Catalan'), + ('ce', 'нохчийн', 'Chechen'), + ('ceb', 'Cebuano', 'Cebuano'), + ('ckb', 'کوردیی ناوەندی', 'Kurdish (Sorani)'), + ('cs', 'čeština', 'Czech'), + ('cv', 'Чӑвашла', 'Chuvash'), + ('cy', 'Cymraeg', 'Welsh'), + ('da', 'dansk', 'Danish'), + ('de', 'Deutsch', 'German'), + ('el', 'Ελληνικά', 'Greek'), + ('en', 'English', 'English'), + ('eo', 'Esperanto', 'Esperanto'), + ('es', 'español', 'Spanish'), + ('et', 'eesti', 'Estonian'), + ('eu', 'euskara', 'Basque'), + ('fa', 'فارسی', 'Persian'), + ('fi', 'suomi', 'Finnish'), + ('fo', 'føroyskt', 'Faroese'), + ('fr', 'français', 'French'), + ('fy', 'Frysk', 'West Frisian'), + ('ga', 'Gaeilge', 'Irish'), + ('gd', 'Gàidhlig', 'Scottish Gaelic'), + ('gl', 'galego', 'Galician'), + ('gu', 'ગુજરાતી', 'Gujarati'), + ('he', 'עברית', 'Hebrew'), + ('hi', 'हिन्दी', 'Hindi'), + ('hr', 'hrvatski', 'Croatian'), + ('hsb', 'hornjoserbsce', 'Upper Sorbian'), + ('ht', 'Kreyòl ayisyen', 'Haitian'), + ('hu', 'magyar', 'Hungarian'), + ('hy', 'Հայերեն', 'Armenian'), + ('ia', 'interlingua', 'Interlingua'), + ('id', 'Bahasa Indonesia', 'Indonesian'), + ('io', 'Ido', 'Ido'), + ('is', 'íslenska', 'Icelandic'), + ('it', 'italiano', 'Italian'), + ('ja', '日本語', 'Japanese'), + ('jv', 'Basa Jawa', 'Javanese'), + ('ka', 'ქართული', 'Georgian'), + ('kk', 'қазақша', 'Kazakh'), + ('kn', 'ಕನ್ನಡ', 'Kannada'), + ('ko', '한국어', 'Korean'), + ('ku', 'Kurdî', 'Kurdish (Kurmanji)'), + ('ky', 'Кыргызча', 'Kirghiz'), + ('la', 'Latina', 'Latin'), + ('lb', 'Lëtzebuergesch', 'Luxembourgish'), + ('li', 'Limburgs', 'Limburgish'), + ('lmo', 'lumbaart', 'Lombard'), + ('lt', 'lietuvių', 'Lithuanian'), + ('lv', 'latviešu', 'Latvian'), + ('map-bms', 'Basa Banyumasan', 'Banyumasan'), + ('mg', 'Malagasy', 'Malagasy'), + ('min', 'Baso Minangkabau', 'Minangkabau'), + ('mk', 'македонски', 'Macedonian'), + ('ml', 'മലയാളം', 'Malayalam'), + ('mn', 'монгол', 'Mongolian'), + ('mr', 'मराठी', 'Marathi'), + ('mrj', 'кырык мары', 'Hill Mari'), + ('ms', 'Bahasa Melayu', 'Malay'), + ('my', 'မြန်မာဘာသာ', 'Burmese'), + ('mzn', 'مازِرونی', 'Mazandarani'), + ('nah', 'Nāhuatl', 'Nahuatl'), + ('nap', 'Napulitano', 'Neapolitan'), + ('nds', 'Plattdüütsch', 'Low Saxon'), + ('ne', 'नेपाली', 'Nepali'), + ('new', 'नेपाल भाषा', 'Newar'), + ('nl', 'Nederlands', 'Dutch'), + ('nn', 'norsk nynorsk', 'Norwegian (Nynorsk)'), + ('no', 'norsk bokmål', 'Norwegian (Bokmål)'), + ('oc', 'occitan', 'Occitan'), + ('or', 'ଓଡ଼ିଆ', 'Oriya'), + ('os', 'Ирон', 'Ossetian'), + ('pa', 'ਪੰਜਾਬੀ', 'Eastern Punjabi'), + ('pl', 'polski', 'Polish'), + ('pms', 'Piemontèis', 'Piedmontese'), + ('pnb', 'پنجابی', 'Western Punjabi'), + ('pt', 'português', 'Portuguese'), + ('qu', 'Runa Simi', 'Quechua'), + ('ro', 'română', 'Romanian'), + ('ru', 'русский', 'Russian'), + ('sa', 'संस्कृतम्', 'Sanskrit'), + ('sah', 'саха тыла', 'Sakha'), + ('scn', 'sicilianu', 'Sicilian'), + ('sco', 'Scots', 'Scots'), + ('sh', 'srpskohrvatski / српскохрватски', 'Serbo-Croatian'), + ('si', 'සිංහල', 'Sinhalese'), + ('simple', 'Simple English', 'Simple English'), + ('sk', 'slovenčina', 'Slovak'), + ('sl', 'slovenščina', 'Slovenian'), + ('sq', 'shqip', 'Albanian'), + ('sr', 'српски / srpski', 'Serbian'), + ('su', 'Basa Sunda', 'Sundanese'), + ('sv', 'svenska', 'Swedish'), + ('sw', 'Kiswahili', 'Swahili'), + ('ta', 'தமிழ்', 'Tamil'), + ('te', 'తెలుగు', 'Telugu'), + ('tg', 'тоҷикӣ', 'Tajik'), + ('th', 'ไทย', 'Thai'), + ('tl', 'Tagalog', 'Tagalog'), + ('tr', 'Türkçe', 'Turkish'), + ('tt', 'татарча/tatarça', 'Tatar'), + ('uk', 'українська', 'Ukrainian'), + ('ur', 'اردو', 'Urdu'), + ('uz', 'oʻzbekcha/ўзбекча', 'Uzbek'), + ('vec', 'vèneto', 'Venetian'), + ('vi', 'Tiếng Việt', 'Vietnamese'), + ('vo', 'Volapük', 'Volapük'), + ('wa', 'walon', 'Walloon'), + ('war', 'Winaray', 'Waray'), + ('yi', 'ייִדיש', 'Yiddish'), + ('yo', 'Yorùbá', 'Yoruba'), + ('zh', '中文', 'Chinese'), + ('zh-min-nan', 'Bân-lâm-gú', 'Min Nan'), + ('zh-yue', '粵語', 'Cantonese'), +] + +def get_langs() -> list[dict[str, str]]: + """List of all known languages.""" + return [dict(zip(('code', 'local', 'english'), l)) for l in langs] + +def get_current_language() -> str: + """Return ISO-3166 language code for the current language.""" + return session.get('current_lang', 'en') if has_request_context() else 'en' diff --git a/add_links/match.py b/add_links/match.py new file mode 100644 index 0000000..dc3dabc --- /dev/null +++ b/add_links/match.py @@ -0,0 +1,381 @@ +from __future__ import unicode_literals + +import re +import typing + +from .api import MissingPage, call_get_diff, get_wiki_info +from .core import get_case_from_content, get_content_and_timestamp, get_revision_info +from .util import is_title_case, lc_alpha + +re_link_in_text = re.compile(r"\[\[[^]]+?\]\]", re.I | re.S) + + +class LinkReplace(Exception): + pass + + +en_dash = "\u2013" +trans = {",": ",?", " ": " *[-\n]? *"} +trans[en_dash] = trans[" "] + +trans2 = {" ": r"('?s?\]\])?'?s? ?(\[\[(?:.+\|)?)?", "-": "[- ]"} +trans2[en_dash] = trans2[" "] + +patterns = [ + lambda q: re.compile( + r"(?]*?)?>\s*({{cite.*?}}|\[https?://[^]]*?\])\s*", re.I | re.S +) + + +def parse_cite(text: str) -> typing.Iterator[tuple[str, str]]: + """Parse a citation template.""" + prev = 0 + for m in re_cite.finditer(text): + yield ("text", text[prev : m.start()]) + yield ("cite", m.group(0)) + prev = m.end() + yield ("text", text[prev:]) + + +re_heading = re.compile(r"^\s*(=+)\s*(.+)\s*\1(|\s)*$") + + +def section_iter(text: str) -> typing.Iterator[tuple[str | None, str]]: + """Iterate sections yielding tuples of heading and section text.""" + cur_section = "" + heading = None + in_comment = False + for line in text.splitlines(True): + if "" in line: + in_comment = False + m = re_heading.match(line) + if in_comment or not m: + cur_section += line + continue + if cur_section or heading: + yield (heading, cur_section) + heading = m.group() + cur_section = "" + continue + yield (heading, cur_section) + + +def get_subsections(text: str, section_num: int) -> str: + "retrieve the text of subsections for a given section number within an article" + found = "" + collection_level = None + for num, (heading, body) in enumerate(section_iter(text)): + if heading is None: + level = 0 + else: + m = re_heading.match(heading) + assert m + level = len(m.group(1)) + if num == section_num: + collection_level = level + continue + if collection_level: + if level > collection_level: + assert heading + found += heading + body + else: + break + return found + + +def match_found(m, q, linkto): + if q[1:] == m.group(0)[1:]: + replacement = m.group(1) + q[1:] + elif any(c.isupper() for c in q[1:]) or m.group(0) == m.group(0).upper(): + replacement = q + elif is_title_case(m.group(0)): + replacement = None + replacement = get_case_from_content(q) + if replacement is None: + replacement = q.lower() + else: + replacement = m.group(1) + q[1:] + assert replacement + if linkto: + if linkto[0].isupper() and replacement[0] == linkto[0].lower(): + linkto = linkto[0].lower() + linkto[1:] + elif replacement[0].isupper(): + linkto = linkto[0].upper() + linkto[1:] + replacement = linkto + "|" + replacement + return replacement + + +def parse_links(text: str) -> typing.Iterator[tuple[str, str]]: + prev = 0 + for m in re_link_in_text.finditer(text): + if prev != m.start(): + yield ("text", text[prev : m.start()]) + if any( + m.group().lower().startswith("[[" + prefix) + for prefix in ("file:", "image:") + ): + yield ("image", m.group(0)) + else: + yield ("link", m.group(0)) + prev = m.end() + if prev < len(text): + yield ("text", text[prev:]) + + +def mk_link_matcher(q): + re_links = [p(q) for p in patterns] + + def search_for_link(text): + for re_link in re_links: + m = re_link.search(text) + if m and m.group(0).count("[[") < 4: + return m + + return search_for_link + + +def add_link(m, replacement, text): + return m.re.sub(lambda m: "[[%s]]" % replacement, text, count=1) + + +def find_link_in_chunk(q, content, linkto=None): + search_for_link = mk_link_matcher(q) + new_content = "" + replacement = None + + match_in_non_link = False + bad_link_match = False + found_text_to_link = None + + for token_type, text in parse_links(content): + if token_type == "text": + if search_for_link(text): + match_in_non_link = True + elif token_type == "image": + before, sep, link_text = text[:-2].rpartition("|") + m = search_for_link(link_text) + if m: + found_text_to_link = m.group(0) + replacement = match_found(m, q, linkto) + text = before + sep + add_link(m, replacement, link_text) + "]]" + elif token_type == "link" and not replacement and not match_in_non_link: + link_text = text[2:-2] + link_dest = None + if "|" in link_text: + link_dest, link_text = link_text.split("|", 1) + m = search_for_link(link_text) + if m and (not link_dest or not link_dest.startswith("#")): + lc_alpha_q = lc_alpha(q) + + bad_link_match = ( + link_dest + and len(link_dest) > len(q) + and (lc_alpha_q not in lc_alpha(link_dest)) + ) + if not link_dest: + if q in link_text and len(link_text) > len(q): + bad_link_match = True + if bad_link_match and link_dest: + try: + link_dest_redirect = get_wiki_info(link_dest) + except MissingPage: + link_dest_redirect = None + if ( + link_dest_redirect + and lc_alpha(link_dest_redirect) == lc_alpha_q + ): + bad_link_match = False + if not bad_link_match: + replacement = match_found(m, q, linkto) + found_text_to_link = m.group(0) + text = add_link(m, replacement, link_text) + new_content += text + if not replacement: + if bad_link_match: + raise LinkReplace + m = search_for_link(content) + if m: + found_text_to_link = m.group(0) + replacement = match_found(m, q, linkto) + new_content = add_link(m, replacement, content) + if linkto: + m_end = m.end() + re_extend = re.compile(m.re.pattern + r"\w*\b", re.I) + m = re_extend.search(content) + if m and m.end() > m_end: + replacement += content[m_end : m.end()] + new_content = add_link(m, replacement, content) + return (new_content, replacement, found_text_to_link) + + +def find_link_in_text(q, content): + (new_content, replacement) = find_link_in_chunk(q, content) + if replacement: + return (new_content, replacement) + raise NoMatch + + +def find_link_in_content(q, content, linkto=None): + if linkto: + try: + return find_link_in_content(linkto, content) + except NoMatch: + pass + replacement = None + new_content = "" + link_replace = False + for header, section_text in section_iter(content): + if header: + new_content += header + for token_type, text in parse_cite(section_text): + if token_type == "text" and not replacement: + try: + (new_text, replacement, replaced_text) = find_link_in_chunk( + q, text, linkto=linkto + ) + except LinkReplace: + link_replace = True + if replacement: + text = new_text + new_content += text + if replacement: + return (new_content, replacement, replaced_text) + raise LinkReplace if link_replace else NoMatch + + +def find_link_and_section(q, content, linkto=None): + if linkto: + try: + return find_link_and_section(linkto, content) + except NoMatch: + pass + sections = list(section_iter(content)) + replacement = None + + search_for_link = mk_link_matcher(q) + + found: dict[str, str | int] = {} + + for section_num, (header, section_text) in enumerate(sections): + new_content = "" + if header: + new_content += header + for token_type, text in parse_cite(section_text): + if token_type == "text" and not replacement: + new_text = "" + for token_type2, text2 in parse_links(text): + if token_type2 == "link" and not replacement: + link_text = text2[2:-2] + if "|" in link_text: + link_dest, link_text = link_text.split("|", 1) + else: + link_dest = None + m = search_for_link(link_text) + if m: + if link_dest: + found["link_dest"] = link_dest + found["link_text"] = link_text + replacement = match_found(m, q, None) + text2 = add_link(m, replacement, link_text) + new_text += text2 + if replacement: + text = new_text + else: + m = search_for_link(text) + if m: + replacement = match_found(m, q, linkto) + text = add_link(m, replacement, text) + new_content += text + if replacement: + found.update( + { + "section_num": section_num, + "section_text": new_content, + "old_text": (header or "") + section_text, + "replacement": replacement, + } + ) + return found + raise NoMatch + + +def find_refs(text: str) -> list[str]: + """Find in wikitext.""" + + refs = re.findall("]*)>(.+?)", text) + print(refs) + return refs + + +def new_link_is_in_ref(replacement: str, text: str) -> bool: + """Is the new link in a .""" + link = f"[[{replacement}]]" + return any(link in ref for ref in find_refs(text)) + + +def get_match(q: str, title: str, linkto: str | None) -> dict[str, typing.Any]: + """Get match.""" + rev = get_revision_info(title) + + found: dict[str, typing.Any] = find_link_and_section(q, rev["content"], linkto) + + assert not new_link_is_in_ref(found["replacement"], found["section_text"]) + + found["revid"] = rev["revid"] + found["pageid"] = rev["pageid"] + found["section_text"] += get_subsections(rev["content"], found["section_num"]) + + return found + + +def get_diff(q: str, title: str, linkto: str | None) -> dict[str, typing.Any]: + """Get diff.""" + content, timestamp = get_content_and_timestamp(title) + found: dict[str, typing.Any] = find_link_and_section(q, content, linkto) + + if new_link_is_in_ref(found["replacement"], found["section_text"]): + raise NoMatch + + section_text = found["section_text"] + get_subsections( + content, found["section_num"] + ) + + found["diff"] = call_get_diff(title, found["section_num"], section_text) + return found diff --git a/add_links/mediawiki_api.py b/add_links/mediawiki_api.py new file mode 100644 index 0000000..fa69bde --- /dev/null +++ b/add_links/mediawiki_api.py @@ -0,0 +1,101 @@ +"""Interface with the mediawiki API.""" + +import typing +from pprint import pprint +from typing import Any, cast + +from . import wikidata_oauth + +wiki_hostname = "en.wikipedia.org" +wiki_api_php = f"https://{wiki_hostname}/w/api.php" +user_agent = "add-links/0.1" + + +def parse_page(enwiki: str) -> dict[str, Any]: + """Call mediawiki parse API for given article.""" + params: dict[str, str | int] = { + "action": "parse", + "format": "json", + "formatversion": 2, + "disableeditsection": 1, + "page": enwiki, + "prop": "text|links|headhtml", + "disabletoc": 1, + } + + parse: dict[str, Any] = call(params)["parse"] + return parse + + +def call(params: dict[str, str | int]) -> dict[str, typing.Any]: + """Make GET request to mediawiki API.""" + data = wikidata_oauth.api_post_request(params) + return cast(dict[str, Any], data.json()) + + +def article_exists(title: str) -> bool: + """Get article text.""" + params: dict[str, str | int] = { + "action": "query", + "format": "json", + "formatversion": 2, + "titles": title, + } + return not call(params)["query"]["pages"][0].get("missing") + + +def get_content(title: str) -> tuple[str, int]: + """Get article text.""" + params: dict[str, str | int] = { + "action": "query", + "format": "json", + "formatversion": 2, + "prop": "revisions|info", + "rvprop": "content|timestamp|ids", + "titles": title, + } + data = call(params) + rev = data["query"]["pages"][0]["revisions"][0] + content: str = rev["content"] + revid: int = int(rev["revid"]) + return content, revid + + +def compare(title: str, new_text: str) -> str: + """Generate a diff for the new article text.""" + params: dict[str, str | int] = { + "format": "json", + "formatversion": 2, + "action": "compare", + "fromtitle": title, + "toslots": "main", + "totext-main": new_text, + "prop": "diff", + } + diff: str = call(params)["compare"]["body"] + return diff + + +def edit_page( + pageid: int, section: str | int, text: str, summary: str, baserevid: str, token: str +) -> str: + """Edit a page on Wikipedia.""" + params: dict[str, str | int] = { + "format": "json", + "formatversion": 2, + "action": "edit", + "pageid": pageid, + "text": text, + "baserevid": baserevid, + "token": token, + "nocreate": 1, + "summary": summary, + "section": section, + } + ret = call(params) + if "edit" not in ret: + print("params") + pprint(params) + print() + pprint(ret) + return typing.cast(str, ret["edit"]) diff --git a/add_links/mediawiki_api_old.py b/add_links/mediawiki_api_old.py new file mode 100644 index 0000000..0196207 --- /dev/null +++ b/add_links/mediawiki_api_old.py @@ -0,0 +1,48 @@ +"""Interface with the mediawiki API.""" + +from typing import Any + +import requests + +wiki_hostname = "en.wikipedia.org" +wiki_api_php = f"https://{wiki_hostname}/w/api.php" +user_agent = "dab-mechanic/0.1" + + +def parse_page(enwiki: str) -> dict[str, Any]: + """Call mediawiki parse API for given article.""" + params: dict[str, str | int] = { + "action": "parse", + "format": "json", + "formatversion": 2, + "disableeditsection": 1, + "page": enwiki, + "prop": "text|links|headhtml", + "disabletoc": 1, + } + + parse: dict[str, Any] = get(params)["parse"] + return parse + + +def get(params: dict[str, str | int]) -> dict[str, Any]: + """Make GET request to mediawiki API.""" + data: dict[str, Any] = requests.get( + wiki_api_php, headers={"User-Agent": user_agent}, params=params + ).json() + return data + + +def get_content(title: str) -> str: + """Get article text.""" + params: dict[str, str | int] = { + "action": "query", + "format": "json", + "formatversion": 2, + "prop": "revisions|info", + "rvprop": "content|timestamp", + "titles": title, + } + data = get(params) + rev: str = data["query"]["pages"][0]["revisions"][0]["content"] + return rev diff --git a/add_links/util.py b/add_links/util.py new file mode 100644 index 0000000..d351d68 --- /dev/null +++ b/add_links/util.py @@ -0,0 +1,115 @@ +"""Util functions.""" + +import re +import urllib +from typing import Any + +# util functions that don't access the network + +namespaces = { + ns.casefold() + for ns in ( + "Special", + "Media", + "Talk", + "Template", + "Portal", + "Portal talk", + "Book", + "Book talk", + "Template talk", + "Draft", + "Draft talk", + "Help", + "Help talk", + "Category", + "Category talk", + "User", + "Gadget", + "Gadget talk", + "Gadget definition", + "Gadget definition talk", + "Topic", + "User talk", + "Wikipedia", + "Education Program", + "Education Program talk", + "Wikipedia talk", + "File", + "File talk", + "TimedText", + "TimedText talk", + "MediaWiki", + "Module", + "Module talk", + "MediaWiki talk", + ) +} + +re_space_or_dash = re.compile("[ -]") + + +def is_title_case(phrase: str) -> bool: + """Is a given phrase is in Title Case.""" + return all( + term[0].isupper() and term[1:].islower() + for term in re_space_or_dash.split(phrase) + if term and term[0].isalpha() + ) + + +def urlquote(value: str) -> str: + """Prepare string for use in URL param.""" + return urllib.parse.quote_plus(value.encode("utf-8")) + + +def strip_parens(q: str) -> str: + """Remove a word in parenthesis from the end of a string.""" + m = re.search(r" \(.*?\)$", q) + return q[: m.start()] if m else q + + +def starts_with_namespace(title: str) -> bool: + """Check if a title starts with a namespace.""" + return ":" in title and title.split(":", 1)[0].casefold() in namespaces + + +def is_disambig(doc: dict[str, Any]) -> bool: + """Is a this a disambiguation page.""" + return any( + "disambig" in t + or t.endswith("dis") + or "given name" in t + or t == "template:surname" + for t in (t["title"].lower() for t in doc.get("templates", [])) + ) + + +def norm(s: str) -> str: + """Normalise string.""" + s = re.sub(r"\W", "", s).lower() + return s[:-1] if s and s[-1] == "s" else s + + +def case_flip(s: str) -> str: + """Switch case of character.""" + if s.islower(): + return s.upper() + if s.isupper(): + return s.lower() + return s + + +def case_flip_first(s: str) -> str: + """Switch case of first character in string.""" + return case_flip(s[0]) + s[1:] + + +def lc_alpha(s: str) -> str: + """Lower case alphabetic characters in string.""" + return "".join(c.lower() for c in s if c.isalpha()) + + +def wiki_space_norm(s: str) -> str: + """Normalise article title.""" + return s.replace("_", " ").strip() diff --git a/add_links/wikidata_oauth.py b/add_links/wikidata_oauth.py new file mode 100644 index 0000000..d91b66e --- /dev/null +++ b/add_links/wikidata_oauth.py @@ -0,0 +1,98 @@ +import typing +import urllib +from typing import cast + +from flask import current_app, session +from requests_oauthlib import OAuth1Session + +wiki_hostname = "en.wikipedia.org" +api_url = f"https://{wiki_hostname}/w/api.php" + + +def get_edit_proxy() -> dict[str, str]: + """Retrieve proxy information from config.""" + edit_proxy = current_app.config.get("EDIT_PROXY") + if edit_proxy: + return {"http": edit_proxy, "https": edit_proxy} + else: + return {} + + +def api_post_request(params: dict[str, str | int]): + """HTTP Post using Oauth.""" + app = current_app + # url = "https://www.wikidata.org/w/api.php" + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + ) + proxies = get_edit_proxy() + return oauth.post(api_url, data=params, timeout=4, proxies=proxies) + + +def raw_request(params: typing.Mapping[str, str | int]): + """Low-level API request.""" + app = current_app + # url = "https://www.wikidata.org/w/api.php?" + urlencode(params) + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + ) + proxies = get_edit_proxy() + return oauth.get( + api_url + "?" + urllib.parse.urlencode(params), timeout=4, proxies=proxies + ) + + +def api_request(params: typing.Mapping[str, str | int]) -> dict[str, typing.Any]: + """Make an API request with OAuth.""" + r = raw_request(params) + try: + return cast(dict[str, typing.Any], r.json()) + except Exception: + print("text") + print(r.text) + print("---") + raise + + +def get_token() -> str: + """Get CSRF tokebn from MediaWiki API.""" + params: dict[str, str | int] = { + "action": "query", + "meta": "tokens", + "format": "json", + "formatversion": 2, + } + reply = api_request(params) + token: str = reply["query"]["tokens"]["csrftoken"] + + return token + + +def userinfo_call() -> typing.Mapping[str, typing.Any]: + """Request user information via OAuth.""" + params = {"action": "query", "meta": "userinfo", "format": "json"} + return api_request(params) + + +def get_username() -> None | str: + """Get the username or None if not logged in.""" + if "owner_key" not in session: + return None # not authorized + + if "username" not in session: + reply = userinfo_call() + if "query" not in reply: + return None + session["username"] = reply["query"]["userinfo"]["name"] + + return cast(str, session["username"]) diff --git a/add_links/wikipedia.py b/add_links/wikipedia.py new file mode 100644 index 0000000..cf9510f --- /dev/null +++ b/add_links/wikipedia.py @@ -0,0 +1,201 @@ +from collections import defaultdict +from typing import Any, Iterator, Optional, TypedDict + +import flask +import lxml.html + +from . import mediawiki_api + +disambig_templates = [ + "Template:Disambiguation", + "Template:Airport disambiguation", + "Template:Biology disambiguation", + "Template:Call sign disambiguation", + "Template:Caselaw disambiguation", + "Template:Chinese title disambiguation", + "Template:Disambiguation cleanup", + "Template:Genus disambiguation", + "Template:Hospital disambiguation", + "Template:Human name disambiguation", + "Template:Human name disambiguation cleanup", + "Template:Letter-number combination disambiguation", + "Template:Mathematical disambiguation", + "Template:Military unit disambiguation", + "Template:Music disambiguation", + "Template:Number disambiguation", + "Template:Opus number disambiguation", + "Template:Phonetics disambiguation", + "Template:Place name disambiguation", + "Template:Portal disambiguation", + "Template:Road disambiguation", + "Template:School disambiguation", + "Template:Species Latin name abbreviation disambiguation", + "Template:Species Latin name disambiguation", + "Template:Station disambiguation", + "Template:Synagogue disambiguation", + "Template:Taxonomic authority disambiguation", + "Template:Taxonomy disambiguation", + "Template:Template disambiguation", + "Template:WoO number disambiguation", +] + + +def link_params(enwiki: str) -> dict[str, str | int]: + """Parameters for finding article links from the API.""" + params: dict[str, str | int] = { + "action": "query", + "format": "json", + "formatversion": 2, + "titles": enwiki, + "generator": "links", + "gpllimit": "max", + "gplnamespace": 0, + "tllimit": "max", + "redirects": 1, + "tlnamespace": 10, + "tltemplates": "|".join(disambig_templates), + "prop": "templates", + } + return params + + +def needs_disambig(link: dict[str, Any]) -> bool: + """Is this a disambiguation link.""" + return bool( + not link["title"].endswith(" (disambiguation)") and link.get("templates") + ) + + +def get_article_links(enwiki: str) -> list[str]: + """Get links that appear in this article.""" + + params: dict[str, str | int] = link_params(enwiki) + links: set[str] = set() + + redirects = defaultdict(set) + + while True: + data = mediawiki_api.get(params) + pages = data["query"].pop("pages") + for r in data["query"].pop("redirects"): + redirects[r["to"]].add(r["from"]) + + links.update(page["title"] for page in pages if needs_disambig(page)) + + if "continue" not in data: + break + + params["gplcontinue"] = data["continue"]["gplcontinue"] + + for link in set(links): + if link in redirects: + links.update(redirects[link]) + + return list(links) + + # return {link["title"] for link in r.json()["query"]["pages"][0]["links"]} + + +def get_article_html(enwiki: str) -> str: + """Parse article wikitext and return HTML.""" + text: str = mediawiki_api.parse_page(enwiki)["text"] + return text + + +class DabItem(TypedDict): + """Represent a disabiguation page.""" + + num: int + title: str + html: str + + +def delete_toc(root: lxml.html.HtmlElement) -> None: + """Delete table of contents from article HTML.""" + for toc in root.findall(".//div[@class='toc']"): + toc.getparent().remove(toc) + + +def get_dab_html(dab_num: int, title: str) -> str: + """Parse dab page and rewrite links.""" + dab_html = get_article_html(title) + root = lxml.html.fromstring(dab_html) + delete_toc(root) + + element_id_map = {e.get("id"): e for e in root.findall(".//*[@id]")} + + for a in root.findall(".//a[@href]"): + href: str | None = a.get("href") + if not href: + continue + if not href.startswith("#"): + a.set("href", "#") + a.set("onclick", f"return select_dab(this, {dab_num})") + continue + + destination_element = element_id_map[href[1:]] + assert destination_element is not None + destination_element.set("id", f"{dab_num}{href[1:]}") + a.set("href", f"#{dab_num}{href[1:]}") + + html: str = lxml.html.tostring(root, encoding=str) + return html + + +class Article: + """Current article we're working on.""" + + def __init__(self, enwiki: str) -> None: + """Make a new Article object.""" + self.enwiki = enwiki.replace("_", " ") + + self.links = get_article_links(enwiki) + + self.dab_list: list[DabItem] = [] + self.dab_lookup: dict[int, str] = {} + self.dab_order: list[str] = [] + self.parse: Optional[dict[str, Any]] = None + + def save_endpoint(self) -> str: + """Endpoint for saving changes.""" + href: str = flask.url_for("save", enwiki=self.enwiki.replace(" ", "_")) + return href + + def load(self) -> None: + """Load parsed article HTML.""" + self.parse = mediawiki_api.parse_page(self.enwiki) + self.root = lxml.html.fromstring(self.parse.pop("text")) + + def iter_links(self) -> Iterator[tuple[lxml.html.Element, str]]: + """Disambiguation links that need fixing.""" + seen = set() + for a in self.root.findall(".//a[@href]"): + title = a.get("title") + if title is None or title not in self.links: + continue + a.set("class", "disambig") + + if title in seen: + continue + seen.add(title) + + yield a, title + + def process_links(self) -> None: + """Process links in parsed wikitext.""" + for dab_num, (a, title) in enumerate(self.iter_links()): + a.set("id", f"dab-{dab_num}") + + dab: DabItem = { + "num": dab_num, + "title": title, + "html": get_dab_html(dab_num, title), + } + self.dab_list.append(dab) + self.dab_order.append(title) + self.dab_lookup[dab_num] = title + + def get_html(self) -> str: + """Return the processed article HTML.""" + html: str = lxml.html.tostring(self.root, encoding=str) + return html diff --git a/cmdline.py b/cmdline.py new file mode 100755 index 0000000..1d1de7c --- /dev/null +++ b/cmdline.py @@ -0,0 +1,114 @@ +#!/usr/bin/python3 + +import collections +import json +import re +import sys +import time +import typing + +from add_links import api + +# from_title = sys.argv[1] + +re_disambig = re.compile(r"^(.*) \((.*)\)$") + + +def article_title_to_search_query(title: str) -> str: + """Convert from article title to search query string.""" + m = re_disambig.match(title) + return f'"{m.group(1)}" AND "{m.group(2)}"' if m else f'"{title}"' + + +def run_search(q: str, limit: int | str = "max") -> dict[str, typing.Any]: + """Search Wikipedia.""" + params = {"list": "search", "srwhat": "text", "srlimit": limit, "srsearch": q} + return typing.cast(dict[str, typing.Any], api.api_get(params)["query"]) + + +def search_no_link(q: str) -> tuple[int, list[dict[str, str | int]]]: + """Search for mentions of article title with no link included.""" + query = run_search(article_title_to_search_query(q) + f' -linksto:"{q}"', "max") + totalhits = query["searchinfo"]["totalhits"] + results = query["search"] + return (totalhits, results) + + +def search_count(q: str) -> int: + """How often does this article title appear in Wikipedia.""" + query = run_search(article_title_to_search_query(q), limit=0) + return typing.cast(int, query["searchinfo"]["totalhits"]) - 1 + + +def search_count_with_link(q: str) -> int: + """How often does this article title appear in Wikipedia.""" + query = run_search(article_title_to_search_query(q) + f' linksto:"{q}"', limit=0) + return typing.cast(int, query["searchinfo"]["totalhits"]) + + +def parse_contribs() -> list[tuple[str, int]]: + re_comment = re.compile(r"^link \[\[(.*)\]\] using") + + links: collections.Counter[str] = collections.Counter() + + for line in open("../wikipedia-contribs/contribs"): + if ( + '"comment": "link ' not in line + or "using [[User:Edward/Find link|Find link]]" not in line + ): + continue + comment = json.loads(line)["comment"] + + m = re_comment.match(comment) + if not m: + continue + link = m.group(1) + + if "|" not in link: + links[link] += 1 + + return links.most_common(200) + + +with open("examples") as f: + seen = {json.loads(line)["title"] for line in f} + + +out = open("examples", "a") +for from_title, num in parse_contribs(): + if from_title in seen: + continue + count = search_count(from_title) + count_with_link = search_count_with_link(from_title) + ratio = float(count_with_link) / float(count) + + print(from_title, count, count_with_link, f"{ratio:.1%}") + print( + json.dumps( + {"title": from_title, "total": count, "with_links": count_with_link} + ), + file=out, + ) + out.flush() + time.sleep(0.1) +out.close() + +sys.exit(0) + + +count = search_count(from_title) +count_with_link = search_count_with_link(from_title) +ratio = float(count_with_link) / float(count) + +print(count, count_with_link, f"{ratio:.1%}") + +sys.exit(0) + +totalhits, search_hits = search_no_link(from_title) + +for hit in search_hits: + print(" ", hit) +print(count, count_with_link, f"{ratio:.1%}", totalhits, len(search_hits)) + +# ret = core.do_search(from_title) +# print(ret) diff --git a/frontend/.eslintrc.js b/frontend/.eslintrc.js new file mode 100644 index 0000000..24f77b6 --- /dev/null +++ b/frontend/.eslintrc.js @@ -0,0 +1,23 @@ +module.exports = { + "env": { + "browser": true, + "es6": true + }, + "extends": [ + "plugin:vue/essential", + "standard" + ], + "globals": { + "Atomics": "readonly", + "SharedArrayBuffer": "readonly" + }, + "parserOptions": { + "ecmaVersion": 14, + "sourceType": "module" + }, + "plugins": [ + "vue" + ], + "rules": { + } +}; diff --git a/frontend/App.vue b/frontend/App.vue new file mode 100644 index 0000000..a311915 --- /dev/null +++ b/frontend/App.vue @@ -0,0 +1,55 @@ + + + + + diff --git a/frontend/entry.js b/frontend/entry.js new file mode 100644 index 0000000..c3a35df --- /dev/null +++ b/frontend/entry.js @@ -0,0 +1,7 @@ +import {createApp} from 'vue'; +import App from './App.vue'; + +export default function(props) { + const app = createApp(App, props).mount('#app'); + return app; +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..b8da5b3 --- /dev/null +++ b/package.json @@ -0,0 +1,18 @@ +{ + "name": "add-links", + "version": "0.0.1", + "scripts": { + "dev": "vite", + "build": "vite build" + }, + "dependencies": { + "bootstrap": "^5.2.3", + "vue": "^3.3.4" + }, + "devDependencies": { + "@vitejs/plugin-vue": "^4.2.3", + "eslint": "^8.41.0", + "eslint-plugin-vue": "^9.13.0", + "vite": "^4.3.8" + } +} diff --git a/templates/all_done.html b/templates/all_done.html new file mode 100644 index 0000000..c346482 --- /dev/null +++ b/templates/all_done.html @@ -0,0 +1,10 @@ +{% extends "base.html" %} + +{% block title %}Index{% endblock %} + +{% block content %} +
+

All done

+
back to index
+
+{% endblock %} diff --git a/templates/article.html b/templates/article.html new file mode 100644 index 0000000..ec3d3e4 --- /dev/null +++ b/templates/article.html @@ -0,0 +1,56 @@ +{% extends "base.html" %} + +{% block title %}{{ title }}{% endblock %} + +{% block style %} + +{% endblock %} + +{% block content %} +
+

{{ self.title() }}

+
+ + +
+
+
+ + + + + +{% endblock %} + diff --git a/templates/article2.html b/templates/article2.html new file mode 100644 index 0000000..8f8ed30 --- /dev/null +++ b/templates/article2.html @@ -0,0 +1,66 @@ +{% extends "base.html" %} + +{% block title %}{{ title }}{% endblock %} + +{% block style %} + +{% endblock %} + +{% block content %} +
+

{{ self.title() }}

+
+ + +
+ +
Username: {{ g.user }}
+ +
view article
+ +
back to index
+ +
total: {{ total }}
+
with link: {{ with_link }}
+
ratio: {{ "{:.1%}".format(with_link / total) }}
+
hit: {{ hit }}
+
replacement: {{ found.replacement }}
+
section: {{ found.section }}
+ + {{ diff | safe }} +
+
+ +
+ + skip +
+
+
+{% endblock %} + diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..3804a16 --- /dev/null +++ b/templates/base.html @@ -0,0 +1,22 @@ + + + + + + + + + {% block title %}{% endblock %} + + + {% block style %}{% endblock %} + + + + {% block content %}{% endblock %} + + + + {% block script %}{% endblock %} + + diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..feab172 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,25 @@ +{% extends "base.html" %} + +{% block title %}Index{% endblock %} + +{% block content %} +
+

Index

+
+ + +
+ +
Username: {{ g.user }}
+ + + {% for item in examples %} + + + + + + {% endfor %} +
{{ item.title }}{{ item.total }}{{ "{:.1%}".format(item.with_links / item.total) }}
+
+{% endblock %} diff --git a/templates/save_done.html b/templates/save_done.html new file mode 100644 index 0000000..c96cc66 --- /dev/null +++ b/templates/save_done.html @@ -0,0 +1,10 @@ +{% extends "base.html" %} + +{% block title %}Index{% endblock %} + +{% block content %} +
+

Save done

+
Save is complete.
+
+{% endblock %} diff --git a/vite.config.js b/vite.config.js new file mode 100644 index 0000000..feeedf8 --- /dev/null +++ b/vite.config.js @@ -0,0 +1,17 @@ +import { defineConfig } from 'vite' +import vue from '@vitejs/plugin-vue' +import path from 'path' + +export default defineConfig({ + plugins: [vue()], + define: { + 'process.env.NODE_ENV': JSON.stringify('production'), + }, + build: { + lib: { + entry: path.resolve(__dirname, 'frontend/entry.js'), + name: 'AddLinks', + fileName: (format) => `add_links.${format}.js`, + }, + }, +}) diff --git a/web_view.py b/web_view.py new file mode 100755 index 0000000..aeeda4a --- /dev/null +++ b/web_view.py @@ -0,0 +1,362 @@ +#!/usr/bin/python3 + +import html +import itertools +import json +import re +import typing + +import flask +import werkzeug +from requests_oauthlib import OAuth1Session +from werkzeug.wrappers.response import Response + +from add_links import api, core, mediawiki_api, wikidata_oauth +from add_links.match import NoMatch, get_diff, get_match + +app = flask.Flask(__name__) +app.config.from_object("config.default") +app.debug = True + +wiki_hostname = "en.wikipedia.org" +wiki_api_php = f"https://{wiki_hostname}/w/api.php" +wiki_index_php = f"https://{wiki_hostname}/w/index.php" + + +class Hit(typing.TypedDict): + """Candidate articles.""" + + ns: int + title: str + pageid: int + size: int + wordcount: int + snippet: str + timestamp: str + + +re_disambig = re.compile(r"^(.*) \((.*)\)$") + + +def load_examples() -> list[dict[str, str | int]]: + """Load examples.""" + return [json.loads(line) for line in open("examples")] + + +def article_title_to_search_query(title: str) -> str: + """Convert from article title to search query string.""" + m = re_disambig.match(title) + return f'"{m.group(1)}" AND "{m.group(2)}"' if m else f'"{title}"' + + +def run_search(q: str, limit: int | str = "max") -> dict[str, typing.Any]: + """Search Wikipedia.""" + params = {"list": "search", "srwhat": "text", "srlimit": limit, "srsearch": q} + return typing.cast(dict[str, typing.Any], api.api_get(params)["query"]) + + +def article_url(title: str) -> str: + """URL for search page.""" + return flask.url_for("article_page", url_title=title.replace(" ", "_")) + + +def search_count(q: str) -> int: + """How often does this article title appear in Wikipedia.""" + query = run_search(article_title_to_search_query(q), limit=0) + return typing.cast(int, query["searchinfo"]["totalhits"]) - 1 + + +def search_count_with_link(q: str) -> int: + """How often does this article title appear in Wikipedia.""" + query = run_search(article_title_to_search_query(q) + f' linksto:"{q}"', limit=0) + return typing.cast(int, query["searchinfo"]["totalhits"]) + + +def search_no_link(q: str) -> tuple[int, list[Hit]]: + """Search for mentions of article title with no link included.""" + query = run_search(article_title_to_search_query(q) + f' -linksto:"{q}"', "max") + totalhits = query["searchinfo"]["totalhits"] + results = query["search"] + return (totalhits, results) + + +@app.before_request +def global_user() -> None: + """Make username available everywhere.""" + flask.g.user = wikidata_oauth.get_username() + + +@app.route("/") +def index() -> str | Response: + """Index page.""" + if "oauth_verifier" in flask.request.args and "oauth_token" in flask.request.args: + return flask.redirect(flask.url_for("oauth_callback", **flask.request.args)) + + examples = load_examples() + examples.sort( + key=lambda i: float(i["with_links"]) / float(i["total"]), reverse=True + ) + + if q := flask.request.args.get("q"): + if q_trimmed := q.strip(): + return flask.redirect(article_url(q_trimmed)) + + return flask.render_template( + "index.html", examples=examples, article_url=article_url + ) + + +def case_flip(s: str) -> str: + """Switch case of character.""" + if s.islower(): + return s.upper() + if s.isupper(): + return s.lower() + return s + + +def case_flip_first(s: str) -> str: + """Switch case of first character in string.""" + return case_flip(s[0]) + s[1:] + + +def tidy_snippet(snippet: str) -> str: + """Remove HTML from snippet.""" + snippet = snippet.replace("\u2013", "-") + snippet = snippet.replace("
", "") + snippet = snippet.replace('', "") + return html.unescape(snippet) + + +@app.route("/oauth/start") +def start_oauth() -> Response: + """Start OAuth.""" + next_page = flask.request.args.get("next") + if next_page: + flask.session["after_login"] = next_page + + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate" + + oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob") + fetch_response = oauth.fetch_request_token(request_token_url) + + flask.session["owner_key"] = fetch_response.get("oauth_token") + flask.session["owner_secret"] = fetch_response.get("oauth_token_secret") + + base_authorization_url = f"https://{wiki_hostname}/wiki/Special:OAuth/authorize" + authorization_url = oauth.authorization_url( + base_authorization_url, oauth_consumer_key=client_key + ) + return flask.redirect(authorization_url) + + +@app.route("/oauth/callback", methods=["GET"]) +def oauth_callback() -> werkzeug.wrappers.response.Response: + """Oauth callback.""" + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=flask.session["owner_key"], + resource_owner_secret=flask.session["owner_secret"], + ) + + oauth_response = oauth.parse_authorization_response(flask.request.url) + verifier = oauth_response.get("oauth_verifier") + access_token_url = wiki_index_php + "?title=Special%3aOAuth%2ftoken" + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=flask.session["owner_key"], + resource_owner_secret=flask.session["owner_secret"], + verifier=verifier, + ) + + oauth_tokens = oauth.fetch_access_token(access_token_url) + flask.session["owner_key"] = oauth_tokens.get("oauth_token") + flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret") + + print("login successful") + + next_page = flask.session.get("after_login") + return flask.redirect(next_page if next_page else flask.url_for("index")) + + +@app.route("/oauth/disconnect") +def oauth_disconnect() -> werkzeug.wrappers.response.Response: + """Disconnect OAuth.""" + for key in "owner_key", "owner_secret", "username", "after_login": + if key in flask.session: + del flask.session[key] + return flask.redirect(flask.url_for("index")) + + +def match_type(q: str, snippet: str) -> str | None: + """Discover match type, ''exact', 'case_mismatch' or None. + + >>> match_type('foo', 'foo') + 'exact' + >>> match_type('foo', 'bar') is None + True + >>> match_type('bar', 'foo bar baz') + 'exact' + >>> match_type('clean coal technology', 'foo clean coal technologies baz') + 'exact' + >>> match_type('bar', 'foo Bar baz') + 'exact' + >>> match_type('bar', 'foo BAR baz') + 'case_mismatch' + >>> match_type('foo-bar', 'aa foo-bar cc') + 'exact' + >>> match_type(u'foo\u2013bar', 'aa foo-bar cc') + 'exact' + """ + q = q.replace("\u2013", "-") + snippet = tidy_snippet(snippet) + + if q in snippet or case_flip_first(q) in snippet: + return "exact" + match = None + if q.lower() in snippet.lower(): + match = "case_mismatch" + if match != "exact" and q.endswith("y"): + if q[:-1] in snippet or case_flip_first(q[:-1]) in snippet: + return "exact" + elif match is None: + if q[:-1].lower() in snippet.lower(): + match = "case_mismatch" + return match + + +class NoGoodHit(Exception): + pass + + +def get_best_hit(title: str, hits: list[Hit]) -> tuple[Hit, dict[str, typing.Any]]: + """Find the best hit within the search results.""" + for hit in hits: + if hit["title"].lower() == title.lower(): + continue + if match_type(title, hit["snippet"]) != "exact": + continue + + try: + print(f'get diff: {hit["title"]}, {title}') + found = get_diff(title, hit["title"], None) + except NoMatch: + print("no match") + continue + + return (hit, found) + + raise NoGoodHit + + +@app.route("/", methods=["GET", "POST"]) +def article_page(url_title: str) -> str | Response: + """Article page.""" + from_title = url_title.replace("_", " ").strip() + + if flask.request.method == "POST": + hit_title = flask.request.form["hit"] + do_save(from_title, hit_title) + return flask.redirect( + flask.url_for("article_page", url_title=url_title, after=hit_title) + ) + + total = search_count(from_title) + with_link = search_count_with_link(from_title) + + no_link_count, hits = search_no_link(from_title) + + after = flask.request.args.get("after") + if after: + print(after) + hits_iter = itertools.dropwhile(lambda hit: hit["title"] != after, hits) + skip = next(hits_iter, None) + if skip: + hits = list(hits_iter) + + try: + hit, found = get_best_hit(from_title, hits) + except NoGoodHit: + return flask.render_template("all_done.html") + + return flask.render_template( + "article2.html", + title=from_title, + total=total, + with_link=with_link, + hit=hit, + replacement=found["replacement"], + diff=found["diff"], + found=found, + url_title=url_title, + ) + + +def do_save(title: str, hit_title: str) -> str: + """Update page on Wikipedia.""" + token = wikidata_oauth.get_token() + + found = get_match(title, hit_title, None) + + summary = ( + f"link [[{found['replacement']}]] using [[:en:User:Edward/Find link|Find link]]" + ) + + edit = mediawiki_api.edit_page( + pageid=found["pageid"], + section=found["section_num"], + text=found["section_text"], + summary=summary, + baserevid=found["revid"], + token=token, + ) + + return edit + + +@app.route("/saved") +def save_done() -> str: + """Save complete.""" + return flask.render_template("save_done.html") + + +@app.route("/api/1/hits") +def api_hits() -> werkzeug.wrappers.response.Response: + """Return canidates for the given article title.""" + title = flask.request.args.get("title") + assert title + ret = core.do_search(title) + return flask.jsonify(title=title, hits=ret["results"]) + + # mock_hits: list[Hit] = json.load(open("sample.json")) + # return flask.jsonify(title=title, hits=mock_hits) + + +@app.route("/api/1/valid_hit") +def api_valid_hit() -> werkzeug.wrappers.response.Response: + """Return canidates for the given article title.""" + link_from = flask.request.args.get("link_from") + link_to = flask.request.args.get("link_to") + + try: + diff, replacement = get_diff(link_to, link_from, None) + except NoMatch: + return flask.jsonify(valid=False) + + return flask.jsonify(valid=True, diff=diff, replacement=replacement) + + +@app.route("/favicon.ico") +def favicon() -> None: + flask.abort(404) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000)