Compare commits
3 commits
1be844e57a
...
4fe0acc167
| Author | SHA1 | Date | |
|---|---|---|---|
| 4fe0acc167 | |||
| 95ca5f755d | |||
| da83f0791d |
5 changed files with 170 additions and 30 deletions
|
|
@ -272,6 +272,10 @@ def call_get_diff(title: str, section_num: int, section_text: str) -> str:
|
|||
}
|
||||
|
||||
s = get_session()
|
||||
ret = s.post(get_query_url(), data=data).json()
|
||||
r = s.post(get_query_url(), data=data)
|
||||
try:
|
||||
ret = r.json()
|
||||
except JSONDecodeError:
|
||||
raise MediawikiError(f"HTTP {r.status_code}: {r.text[:200]!r}")
|
||||
check_for_error(ret)
|
||||
return typing.cast(str, ret["query"]["pages"][0]["revisions"][0]["diff"]["body"])
|
||||
|
|
|
|||
|
|
@ -8,6 +8,12 @@ from .core import get_case_from_content, get_content_and_timestamp, get_revision
|
|||
from .util import is_title_case, lc_alpha
|
||||
|
||||
re_link_in_text = re.compile(r"\[\[[^]]+?\]\]", re.I | re.S)
|
||||
re_category_link = re.compile(r"\[\[Category:[^\]]+\]\]", re.I)
|
||||
|
||||
# Matches when extra words separate the match from a following (ABBREVIATION),
|
||||
# indicating the matched text is just part of a longer named entity.
|
||||
# e.g. "of Russia (AGMR)" matches; " (AGMR)" does not (no intervening words).
|
||||
re_named_entity_abbrev = re.compile(r"^(?:\s+[^\s(]+){1,6}\s*\([A-Z]{2,}\)")
|
||||
|
||||
|
||||
class LinkReplace(Exception):
|
||||
|
|
@ -72,14 +78,62 @@ re_cite = re.compile(
|
|||
re.I | re.S,
|
||||
)
|
||||
|
||||
re_cite_template_start = re.compile(r"\{\{(?:cite|citation|short description|gli|defn)\b", re.I)
|
||||
re_external_link = re.compile(r"\[https?://[^\]]+\]")
|
||||
# Italic text (work titles in bibliographies). Handles apostrophes in content
|
||||
# (e.g. ''It's fine'') but requires at least one non-apostrophe character so
|
||||
# that ''' bold ''' is not consumed as italic.
|
||||
re_italic = re.compile(r"''[^']+(?:'[^']+)*''")
|
||||
# Bullet-point lines that contain a bare URL are unformatted bibliography entries.
|
||||
re_bullet_with_url = re.compile(r"^\*[^\n]*https?://[^\s\n]+[^\n]*", re.MULTILINE)
|
||||
|
||||
|
||||
def find_cite_template_spans(text: str) -> list[tuple[int, int]]:
|
||||
"""Find (start, end) spans of {{Cite ...}} templates, handling nested braces."""
|
||||
spans: list[tuple[int, int]] = []
|
||||
for m in re_cite_template_start.finditer(text):
|
||||
start = m.start()
|
||||
if any(s <= start < e for s, e in spans):
|
||||
continue # already inside a found span
|
||||
depth = 0
|
||||
i = start
|
||||
while i < len(text):
|
||||
if text[i : i + 2] == "{{":
|
||||
depth += 1
|
||||
i += 2
|
||||
elif text[i : i + 2] == "}}":
|
||||
depth -= 1
|
||||
i += 2
|
||||
if depth == 0:
|
||||
spans.append((start, i))
|
||||
break
|
||||
else:
|
||||
i += 1
|
||||
return spans
|
||||
|
||||
|
||||
def parse_cite(text: str) -> typing.Iterator[tuple[str, str]]:
|
||||
"""Parse a citation template."""
|
||||
"""Parse citations yielding (type, chunk) tuples, skipping ref tags, cite templates, and external links."""
|
||||
regions = [(m.start(), m.end()) for m in re_cite.finditer(text)]
|
||||
regions.extend(find_cite_template_spans(text))
|
||||
regions.extend((m.start(), m.end()) for m in re_external_link.finditer(text))
|
||||
regions.extend((m.start(), m.end()) for m in re_italic.finditer(text))
|
||||
regions.extend((m.start(), m.end()) for m in re_bullet_with_url.finditer(text))
|
||||
regions.sort()
|
||||
|
||||
# Merge overlapping regions (e.g. a {{Cite}} that sits inside a <ref>)
|
||||
merged: list[tuple[int, int]] = []
|
||||
for start, end in regions:
|
||||
if merged and start < merged[-1][1]:
|
||||
merged[-1] = (merged[-1][0], max(merged[-1][1], end))
|
||||
else:
|
||||
merged.append((start, end))
|
||||
|
||||
prev = 0
|
||||
for m in re_cite.finditer(text):
|
||||
yield ("text", text[prev : m.start()])
|
||||
yield ("cite", m.group(0))
|
||||
prev = m.end()
|
||||
for start, end in merged:
|
||||
yield ("text", text[prev:start])
|
||||
yield ("cite", text[start:end])
|
||||
prev = end
|
||||
yield ("text", text[prev:])
|
||||
|
||||
|
||||
|
|
@ -163,6 +217,8 @@ def parse_links(text: str) -> typing.Iterator[tuple[str, str]]:
|
|||
for prefix in ("file:", "image:")
|
||||
):
|
||||
yield ("image", m.group(0))
|
||||
elif m.group().lower().startswith("[[category:"):
|
||||
yield ("category", m.group(0))
|
||||
else:
|
||||
yield ("link", m.group(0))
|
||||
prev = m.end()
|
||||
|
|
@ -186,12 +242,66 @@ def mk_link_matcher(q: str) -> typing.Callable[[str], re.Match[str] | None]:
|
|||
|
||||
def add_link(m: re.Match[str], replacement: str, text: str) -> str:
|
||||
"""Add link to text."""
|
||||
|
||||
matched_text = m.group(0)
|
||||
if matched_text.startswith("[[") and matched_text.endswith("|"):
|
||||
return m.re.sub(lambda m: f"[[{replacement}|", text, count=1)
|
||||
else:
|
||||
return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1)
|
||||
|
||||
inner_bracket = matched_text.find("[[")
|
||||
if inner_bracket > 0:
|
||||
prefix = matched_text[:inner_bracket].rstrip()
|
||||
sep = matched_text[len(prefix) : inner_bracket]
|
||||
suffix = matched_text[inner_bracket:]
|
||||
link_dest = replacement.split("|")[0] if "|" in replacement else replacement
|
||||
if text[m.end() : m.end() + 2] == "]]":
|
||||
# The existing [[link]] is fully consumed by the match (its closing ]]
|
||||
# immediately follows). Replace everything with a single clean link.
|
||||
# e.g. "surface [[runoff (hydrology)|runoff]]" → "[[surface runoff]]"
|
||||
return text[: m.start()] + f"[[{link_dest}]]" + text[m.end() + 2 :]
|
||||
# The existing link is only partially consumed; keep it and link just the prefix.
|
||||
# e.g. "cross-platform [[interchange station]]"
|
||||
# → "[[cross-platform interchange|cross-platform]] [[interchange station]]"
|
||||
return text[: m.start()] + f"[[{link_dest}|{prefix}]]{sep}{suffix}" + text[m.end() :]
|
||||
|
||||
close_bracket = matched_text.find("]]")
|
||||
if close_bracket > 0 and m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[":
|
||||
# Match started inside an existing [[link]] and spans beyond its closing ]].
|
||||
# e.g. "[[anti-globalization]] movement" matched as "anti-globalization]] movement"
|
||||
# → absorb the leading [[ and produce "[[anti-globalization movement]]"
|
||||
return text[: m.start() - 2] + f"[[{replacement}]]" + text[m.end() :]
|
||||
|
||||
return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1)
|
||||
|
||||
|
||||
def match_spans_existing_link(m: re.Match[str], text: str) -> bool:
|
||||
"""Return True if the match starts partway inside an existing [[link]].
|
||||
|
||||
The trans2 space pattern allows matching across ]] boundaries, which is
|
||||
intentional for cases like [[anti-globalization]] movement (match starts
|
||||
right after [[). But when the match starts *inside* a link (e.g.
|
||||
[[impervious surface]] runoff matching 'surface runoff'), absorbing the
|
||||
brackets would produce broken wikitext — skip those matches instead.
|
||||
"""
|
||||
close_bracket = m.group(0).find("]]")
|
||||
if close_bracket <= 0:
|
||||
return False
|
||||
# close_bracket > 0: match spans ][. If it starts right after [[ that
|
||||
# opening bracket is absorbed by add_link — not our problem here.
|
||||
if m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[":
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_part_of_named_entity(m: re.Match[str], text: str) -> bool:
|
||||
"""Return True if the match is a title-case fragment of a longer named entity.
|
||||
|
||||
Detects the pattern: matched text (title case) + extra words + (ABBREVIATION).
|
||||
e.g. "Anti-Globalization Movement" followed by "of Russia (AGMR)" → True.
|
||||
But "Anti-Globalization Movement" followed directly by "(AGM)" → False,
|
||||
since the match itself is the full entity name.
|
||||
"""
|
||||
if not m.group(0)[0].isupper():
|
||||
return False
|
||||
return bool(re_named_entity_abbrev.match(text[m.end() :]))
|
||||
|
||||
|
||||
def find_link_in_chunk(
|
||||
|
|
@ -252,8 +362,9 @@ def find_link_in_chunk(
|
|||
if not replacement:
|
||||
if bad_link_match:
|
||||
raise LinkReplace
|
||||
m = search_for_link(content)
|
||||
if m:
|
||||
masked = re_category_link.sub(lambda c: " " * len(c.group(0)), content)
|
||||
m = search_for_link(masked)
|
||||
if m and not is_part_of_named_entity(m, content) and not match_spans_existing_link(m, content):
|
||||
found_text_to_link = m.group(0)
|
||||
replacement = match_found(m, q, linkto)
|
||||
new_content = add_link(m, replacement, content)
|
||||
|
|
@ -342,8 +453,9 @@ def find_link_and_section(q: str, content: str, linkto: str | None = None):
|
|||
if replacement:
|
||||
text = new_text
|
||||
else:
|
||||
m = search_for_link(text)
|
||||
if m:
|
||||
masked = re_category_link.sub(lambda c: " " * len(c.group(0)), text)
|
||||
m = search_for_link(masked)
|
||||
if m and not is_part_of_named_entity(m, text) and not match_spans_existing_link(m, text):
|
||||
replacement = match_found(m, q, linkto)
|
||||
text = add_link(m, replacement, text)
|
||||
new_content += text
|
||||
|
|
|
|||
|
|
@ -4,8 +4,14 @@ import typing
|
|||
from pprint import pprint
|
||||
from typing import Any, cast
|
||||
|
||||
import requests
|
||||
|
||||
from . import mediawiki_oauth
|
||||
|
||||
|
||||
class APIError(Exception):
|
||||
"""Unexpected response from the MediaWiki API."""
|
||||
|
||||
wiki_hostname = "en.wikipedia.org"
|
||||
wiki_api_php = f"https://{wiki_hostname}/w/api.php"
|
||||
user_agent = "add-links/0.1"
|
||||
|
|
@ -27,10 +33,13 @@ def parse_page(enwiki: str) -> dict[str, Any]:
|
|||
return parse
|
||||
|
||||
|
||||
def call(params: dict[str, str | int]) -> dict[str, typing.Any]:
|
||||
def call(params: dict[str, str | int], timeout: int = 4) -> dict[str, typing.Any]:
|
||||
"""Make GET request to mediawiki API."""
|
||||
data = mediawiki_oauth.api_post_request(params)
|
||||
return cast(dict[str, Any], data.json())
|
||||
r = mediawiki_oauth.api_post_request(params, timeout=timeout)
|
||||
try:
|
||||
return cast(dict[str, Any], r.json())
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
raise APIError(f"HTTP {r.status_code}: {r.text[:200]!r}")
|
||||
|
||||
|
||||
def article_exists(title: str) -> bool:
|
||||
|
|
@ -92,7 +101,7 @@ def edit_page(
|
|||
"summary": summary,
|
||||
"section": section,
|
||||
}
|
||||
ret = call(params)
|
||||
ret = call(params, timeout=30)
|
||||
if "edit" not in ret:
|
||||
print("params")
|
||||
pprint(params)
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import requests
|
|||
from flask import current_app, session
|
||||
from requests_oauthlib import OAuth1Session
|
||||
|
||||
from .api import ua
|
||||
|
||||
wiki_hostname = "en.wikipedia.org"
|
||||
api_url = f"https://{wiki_hostname}/w/api.php"
|
||||
|
||||
|
|
@ -27,7 +29,7 @@ def get_edit_proxy() -> dict[str, str]:
|
|||
return {}
|
||||
|
||||
|
||||
def api_post_request(params: dict[str, str | int]) -> requests.Response:
|
||||
def api_post_request(params: dict[str, str | int], timeout: int = 4) -> requests.Response:
|
||||
"""HTTP Post using Oauth."""
|
||||
app = current_app
|
||||
# url = "https://www.wikidata.org/w/api.php"
|
||||
|
|
@ -39,8 +41,9 @@ def api_post_request(params: dict[str, str | int]) -> requests.Response:
|
|||
resource_owner_key=session["owner_key"],
|
||||
resource_owner_secret=session["owner_secret"],
|
||||
)
|
||||
oauth.headers.update({"User-Agent": ua})
|
||||
proxies = get_edit_proxy()
|
||||
return oauth.post(api_url, data=params, timeout=4, proxies=proxies)
|
||||
return oauth.post(api_url, data=params, timeout=timeout, proxies=proxies)
|
||||
|
||||
|
||||
def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response:
|
||||
|
|
@ -57,6 +60,7 @@ def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response:
|
|||
resource_owner_key=session["owner_key"],
|
||||
resource_owner_secret=session["owner_secret"],
|
||||
)
|
||||
oauth.headers.update({"User-Agent": ua})
|
||||
proxies = get_edit_proxy()
|
||||
return oauth.get(
|
||||
api_url + "?" + urllib.parse.urlencode(params), timeout=4, proxies=proxies
|
||||
|
|
|
|||
33
web_view.py
33
web_view.py
|
|
@ -186,6 +186,7 @@ def start_oauth() -> Response:
|
|||
request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate"
|
||||
|
||||
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
|
||||
oauth.headers.update({"User-Agent": api.ua})
|
||||
fetch_response = oauth.fetch_request_token(request_token_url)
|
||||
|
||||
flask.session["owner_key"] = fetch_response.get("oauth_token")
|
||||
|
|
@ -223,7 +224,7 @@ def oauth_callback() -> werkzeug.wrappers.response.Response:
|
|||
resource_owner_secret=flask.session["owner_secret"],
|
||||
verifier=verifier,
|
||||
)
|
||||
|
||||
oauth.headers.update({"User-Agent": api.ua})
|
||||
oauth_tokens = oauth.fetch_access_token(access_token_url)
|
||||
flask.session["owner_key"] = oauth_tokens.get("oauth_token")
|
||||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||
|
|
@ -298,27 +299,37 @@ def get_best_hit(title: str, hits: list[Hit]) -> tuple[Hit, dict[str, typing.Any
|
|||
except NoMatch:
|
||||
print("no match")
|
||||
continue
|
||||
except api.MediawikiError as e:
|
||||
print(f"MediawikiError for {hit['title']!r}: {e}")
|
||||
continue
|
||||
|
||||
return (hit, found)
|
||||
|
||||
raise NoGoodHit
|
||||
|
||||
|
||||
def handle_post(url_title: str) -> Response:
|
||||
"""Handle POST request."""
|
||||
from_title = url_title.replace("_", " ").strip()
|
||||
hit_title = flask.request.form["hit"]
|
||||
try:
|
||||
do_save(from_title, hit_title)
|
||||
except mediawiki_oauth.LoginNeeded:
|
||||
return flask.redirect(flask.url_for("start_oauth"))
|
||||
except mediawiki_api.APIError as e:
|
||||
return flask.make_response(f"Save failed: {e}", 502)
|
||||
return flask.redirect(
|
||||
flask.url_for("article_page", url_title=url_title, after=hit_title)
|
||||
)
|
||||
|
||||
|
||||
@app.route("/link/<path:url_title>", methods=["GET", "POST"])
|
||||
def article_page(url_title: str) -> str | Response:
|
||||
"""Article page."""
|
||||
from_title = url_title.replace("_", " ").strip()
|
||||
|
||||
if flask.request.method == "POST":
|
||||
hit_title = flask.request.form["hit"]
|
||||
try:
|
||||
do_save(from_title, hit_title)
|
||||
except mediawiki_oauth.LoginNeeded:
|
||||
return flask.redirect(flask.url_for("start_oauth"))
|
||||
return flask.redirect(
|
||||
flask.url_for("article_page", url_title=url_title, after=hit_title)
|
||||
)
|
||||
return handle_post(url_title)
|
||||
|
||||
from_title = url_title.replace("_", " ").strip()
|
||||
article_title = flask.request.args.get("title")
|
||||
|
||||
total = search_count(from_title)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue