diff --git a/add_links/api.py b/add_links/api.py index e0174b7..05fc7b1 100644 --- a/add_links/api.py +++ b/add_links/api.py @@ -272,6 +272,10 @@ def call_get_diff(title: str, section_num: int, section_text: str) -> str: } s = get_session() - ret = s.post(get_query_url(), data=data).json() + r = s.post(get_query_url(), data=data) + try: + ret = r.json() + except JSONDecodeError: + raise MediawikiError(f"HTTP {r.status_code}: {r.text[:200]!r}") check_for_error(ret) return typing.cast(str, ret["query"]["pages"][0]["revisions"][0]["diff"]["body"]) diff --git a/add_links/match.py b/add_links/match.py index cc18fdc..5cd4d21 100644 --- a/add_links/match.py +++ b/add_links/match.py @@ -8,6 +8,12 @@ from .core import get_case_from_content, get_content_and_timestamp, get_revision from .util import is_title_case, lc_alpha re_link_in_text = re.compile(r"\[\[[^]]+?\]\]", re.I | re.S) +re_category_link = re.compile(r"\[\[Category:[^\]]+\]\]", re.I) + +# Matches when extra words separate the match from a following (ABBREVIATION), +# indicating the matched text is just part of a longer named entity. +# e.g. "of Russia (AGMR)" matches; " (AGMR)" does not (no intervening words). +re_named_entity_abbrev = re.compile(r"^(?:\s+[^\s(]+){1,6}\s*\([A-Z]{2,}\)") class LinkReplace(Exception): @@ -72,14 +78,62 @@ re_cite = re.compile( re.I | re.S, ) +re_cite_template_start = re.compile(r"\{\{(?:cite|citation|short description|gli|defn)\b", re.I) +re_external_link = re.compile(r"\[https?://[^\]]+\]") +# Italic text (work titles in bibliographies). Handles apostrophes in content +# (e.g. ''It's fine'') but requires at least one non-apostrophe character so +# that ''' bold ''' is not consumed as italic. +re_italic = re.compile(r"''[^']+(?:'[^']+)*''") +# Bullet-point lines that contain a bare URL are unformatted bibliography entries. +re_bullet_with_url = re.compile(r"^\*[^\n]*https?://[^\s\n]+[^\n]*", re.MULTILINE) + + +def find_cite_template_spans(text: str) -> list[tuple[int, int]]: + """Find (start, end) spans of {{Cite ...}} templates, handling nested braces.""" + spans: list[tuple[int, int]] = [] + for m in re_cite_template_start.finditer(text): + start = m.start() + if any(s <= start < e for s, e in spans): + continue # already inside a found span + depth = 0 + i = start + while i < len(text): + if text[i : i + 2] == "{{": + depth += 1 + i += 2 + elif text[i : i + 2] == "}}": + depth -= 1 + i += 2 + if depth == 0: + spans.append((start, i)) + break + else: + i += 1 + return spans + def parse_cite(text: str) -> typing.Iterator[tuple[str, str]]: - """Parse a citation template.""" + """Parse citations yielding (type, chunk) tuples, skipping ref tags, cite templates, and external links.""" + regions = [(m.start(), m.end()) for m in re_cite.finditer(text)] + regions.extend(find_cite_template_spans(text)) + regions.extend((m.start(), m.end()) for m in re_external_link.finditer(text)) + regions.extend((m.start(), m.end()) for m in re_italic.finditer(text)) + regions.extend((m.start(), m.end()) for m in re_bullet_with_url.finditer(text)) + regions.sort() + + # Merge overlapping regions (e.g. a {{Cite}} that sits inside a ) + merged: list[tuple[int, int]] = [] + for start, end in regions: + if merged and start < merged[-1][1]: + merged[-1] = (merged[-1][0], max(merged[-1][1], end)) + else: + merged.append((start, end)) + prev = 0 - for m in re_cite.finditer(text): - yield ("text", text[prev : m.start()]) - yield ("cite", m.group(0)) - prev = m.end() + for start, end in merged: + yield ("text", text[prev:start]) + yield ("cite", text[start:end]) + prev = end yield ("text", text[prev:]) @@ -163,6 +217,8 @@ def parse_links(text: str) -> typing.Iterator[tuple[str, str]]: for prefix in ("file:", "image:") ): yield ("image", m.group(0)) + elif m.group().lower().startswith("[[category:"): + yield ("category", m.group(0)) else: yield ("link", m.group(0)) prev = m.end() @@ -186,12 +242,66 @@ def mk_link_matcher(q: str) -> typing.Callable[[str], re.Match[str] | None]: def add_link(m: re.Match[str], replacement: str, text: str) -> str: """Add link to text.""" - matched_text = m.group(0) if matched_text.startswith("[[") and matched_text.endswith("|"): return m.re.sub(lambda m: f"[[{replacement}|", text, count=1) - else: - return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1) + + inner_bracket = matched_text.find("[[") + if inner_bracket > 0: + prefix = matched_text[:inner_bracket].rstrip() + sep = matched_text[len(prefix) : inner_bracket] + suffix = matched_text[inner_bracket:] + link_dest = replacement.split("|")[0] if "|" in replacement else replacement + if text[m.end() : m.end() + 2] == "]]": + # The existing [[link]] is fully consumed by the match (its closing ]] + # immediately follows). Replace everything with a single clean link. + # e.g. "surface [[runoff (hydrology)|runoff]]" → "[[surface runoff]]" + return text[: m.start()] + f"[[{link_dest}]]" + text[m.end() + 2 :] + # The existing link is only partially consumed; keep it and link just the prefix. + # e.g. "cross-platform [[interchange station]]" + # → "[[cross-platform interchange|cross-platform]] [[interchange station]]" + return text[: m.start()] + f"[[{link_dest}|{prefix}]]{sep}{suffix}" + text[m.end() :] + + close_bracket = matched_text.find("]]") + if close_bracket > 0 and m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[": + # Match started inside an existing [[link]] and spans beyond its closing ]]. + # e.g. "[[anti-globalization]] movement" matched as "anti-globalization]] movement" + # → absorb the leading [[ and produce "[[anti-globalization movement]]" + return text[: m.start() - 2] + f"[[{replacement}]]" + text[m.end() :] + + return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1) + + +def match_spans_existing_link(m: re.Match[str], text: str) -> bool: + """Return True if the match starts partway inside an existing [[link]]. + + The trans2 space pattern allows matching across ]] boundaries, which is + intentional for cases like [[anti-globalization]] movement (match starts + right after [[). But when the match starts *inside* a link (e.g. + [[impervious surface]] runoff matching 'surface runoff'), absorbing the + brackets would produce broken wikitext — skip those matches instead. + """ + close_bracket = m.group(0).find("]]") + if close_bracket <= 0: + return False + # close_bracket > 0: match spans ][. If it starts right after [[ that + # opening bracket is absorbed by add_link — not our problem here. + if m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[": + return False + return True + + +def is_part_of_named_entity(m: re.Match[str], text: str) -> bool: + """Return True if the match is a title-case fragment of a longer named entity. + + Detects the pattern: matched text (title case) + extra words + (ABBREVIATION). + e.g. "Anti-Globalization Movement" followed by "of Russia (AGMR)" → True. + But "Anti-Globalization Movement" followed directly by "(AGM)" → False, + since the match itself is the full entity name. + """ + if not m.group(0)[0].isupper(): + return False + return bool(re_named_entity_abbrev.match(text[m.end() :])) def find_link_in_chunk( @@ -252,8 +362,9 @@ def find_link_in_chunk( if not replacement: if bad_link_match: raise LinkReplace - m = search_for_link(content) - if m: + masked = re_category_link.sub(lambda c: " " * len(c.group(0)), content) + m = search_for_link(masked) + if m and not is_part_of_named_entity(m, content) and not match_spans_existing_link(m, content): found_text_to_link = m.group(0) replacement = match_found(m, q, linkto) new_content = add_link(m, replacement, content) @@ -342,8 +453,9 @@ def find_link_and_section(q: str, content: str, linkto: str | None = None): if replacement: text = new_text else: - m = search_for_link(text) - if m: + masked = re_category_link.sub(lambda c: " " * len(c.group(0)), text) + m = search_for_link(masked) + if m and not is_part_of_named_entity(m, text) and not match_spans_existing_link(m, text): replacement = match_found(m, q, linkto) text = add_link(m, replacement, text) new_content += text diff --git a/add_links/mediawiki_api.py b/add_links/mediawiki_api.py index c834021..611ab5b 100644 --- a/add_links/mediawiki_api.py +++ b/add_links/mediawiki_api.py @@ -4,8 +4,14 @@ import typing from pprint import pprint from typing import Any, cast +import requests + from . import mediawiki_oauth + +class APIError(Exception): + """Unexpected response from the MediaWiki API.""" + wiki_hostname = "en.wikipedia.org" wiki_api_php = f"https://{wiki_hostname}/w/api.php" user_agent = "add-links/0.1" @@ -27,10 +33,13 @@ def parse_page(enwiki: str) -> dict[str, Any]: return parse -def call(params: dict[str, str | int]) -> dict[str, typing.Any]: +def call(params: dict[str, str | int], timeout: int = 4) -> dict[str, typing.Any]: """Make GET request to mediawiki API.""" - data = mediawiki_oauth.api_post_request(params) - return cast(dict[str, Any], data.json()) + r = mediawiki_oauth.api_post_request(params, timeout=timeout) + try: + return cast(dict[str, Any], r.json()) + except requests.exceptions.JSONDecodeError: + raise APIError(f"HTTP {r.status_code}: {r.text[:200]!r}") def article_exists(title: str) -> bool: @@ -92,7 +101,7 @@ def edit_page( "summary": summary, "section": section, } - ret = call(params) + ret = call(params, timeout=30) if "edit" not in ret: print("params") pprint(params) diff --git a/add_links/mediawiki_oauth.py b/add_links/mediawiki_oauth.py index 9b4de72..39aecb0 100644 --- a/add_links/mediawiki_oauth.py +++ b/add_links/mediawiki_oauth.py @@ -8,6 +8,8 @@ import requests from flask import current_app, session from requests_oauthlib import OAuth1Session +from .api import ua + wiki_hostname = "en.wikipedia.org" api_url = f"https://{wiki_hostname}/w/api.php" @@ -27,7 +29,7 @@ def get_edit_proxy() -> dict[str, str]: return {} -def api_post_request(params: dict[str, str | int]) -> requests.Response: +def api_post_request(params: dict[str, str | int], timeout: int = 4) -> requests.Response: """HTTP Post using Oauth.""" app = current_app # url = "https://www.wikidata.org/w/api.php" @@ -39,8 +41,9 @@ def api_post_request(params: dict[str, str | int]) -> requests.Response: resource_owner_key=session["owner_key"], resource_owner_secret=session["owner_secret"], ) + oauth.headers.update({"User-Agent": ua}) proxies = get_edit_proxy() - return oauth.post(api_url, data=params, timeout=4, proxies=proxies) + return oauth.post(api_url, data=params, timeout=timeout, proxies=proxies) def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response: @@ -57,6 +60,7 @@ def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response: resource_owner_key=session["owner_key"], resource_owner_secret=session["owner_secret"], ) + oauth.headers.update({"User-Agent": ua}) proxies = get_edit_proxy() return oauth.get( api_url + "?" + urllib.parse.urlencode(params), timeout=4, proxies=proxies diff --git a/web_view.py b/web_view.py index 96ab370..32fb1da 100755 --- a/web_view.py +++ b/web_view.py @@ -186,6 +186,7 @@ def start_oauth() -> Response: request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate" oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob") + oauth.headers.update({"User-Agent": api.ua}) fetch_response = oauth.fetch_request_token(request_token_url) flask.session["owner_key"] = fetch_response.get("oauth_token") @@ -223,7 +224,7 @@ def oauth_callback() -> werkzeug.wrappers.response.Response: resource_owner_secret=flask.session["owner_secret"], verifier=verifier, ) - + oauth.headers.update({"User-Agent": api.ua}) oauth_tokens = oauth.fetch_access_token(access_token_url) flask.session["owner_key"] = oauth_tokens.get("oauth_token") flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret") @@ -298,27 +299,37 @@ def get_best_hit(title: str, hits: list[Hit]) -> tuple[Hit, dict[str, typing.Any except NoMatch: print("no match") continue + except api.MediawikiError as e: + print(f"MediawikiError for {hit['title']!r}: {e}") + continue return (hit, found) raise NoGoodHit +def handle_post(url_title: str) -> Response: + """Handle POST request.""" + from_title = url_title.replace("_", " ").strip() + hit_title = flask.request.form["hit"] + try: + do_save(from_title, hit_title) + except mediawiki_oauth.LoginNeeded: + return flask.redirect(flask.url_for("start_oauth")) + except mediawiki_api.APIError as e: + return flask.make_response(f"Save failed: {e}", 502) + return flask.redirect( + flask.url_for("article_page", url_title=url_title, after=hit_title) + ) + + @app.route("/link/", methods=["GET", "POST"]) def article_page(url_title: str) -> str | Response: """Article page.""" - from_title = url_title.replace("_", " ").strip() - if flask.request.method == "POST": - hit_title = flask.request.form["hit"] - try: - do_save(from_title, hit_title) - except mediawiki_oauth.LoginNeeded: - return flask.redirect(flask.url_for("start_oauth")) - return flask.redirect( - flask.url_for("article_page", url_title=url_title, after=hit_title) - ) + return handle_post(url_title) + from_title = url_title.replace("_", " ").strip() article_title = flask.request.args.get("title") total = search_count(from_title)