Compare commits
No commits in common. "4fe0acc1675f291ff5453e517ca3fe8c91455be1" and "1be844e57a0bb74f913bd8be8a9acea12cceaf3b" have entirely different histories.
4fe0acc167
...
1be844e57a
5 changed files with 31 additions and 171 deletions
|
|
@ -272,10 +272,6 @@ def call_get_diff(title: str, section_num: int, section_text: str) -> str:
|
||||||
}
|
}
|
||||||
|
|
||||||
s = get_session()
|
s = get_session()
|
||||||
r = s.post(get_query_url(), data=data)
|
ret = s.post(get_query_url(), data=data).json()
|
||||||
try:
|
|
||||||
ret = r.json()
|
|
||||||
except JSONDecodeError:
|
|
||||||
raise MediawikiError(f"HTTP {r.status_code}: {r.text[:200]!r}")
|
|
||||||
check_for_error(ret)
|
check_for_error(ret)
|
||||||
return typing.cast(str, ret["query"]["pages"][0]["revisions"][0]["diff"]["body"])
|
return typing.cast(str, ret["query"]["pages"][0]["revisions"][0]["diff"]["body"])
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,6 @@ from .core import get_case_from_content, get_content_and_timestamp, get_revision
|
||||||
from .util import is_title_case, lc_alpha
|
from .util import is_title_case, lc_alpha
|
||||||
|
|
||||||
re_link_in_text = re.compile(r"\[\[[^]]+?\]\]", re.I | re.S)
|
re_link_in_text = re.compile(r"\[\[[^]]+?\]\]", re.I | re.S)
|
||||||
re_category_link = re.compile(r"\[\[Category:[^\]]+\]\]", re.I)
|
|
||||||
|
|
||||||
# Matches when extra words separate the match from a following (ABBREVIATION),
|
|
||||||
# indicating the matched text is just part of a longer named entity.
|
|
||||||
# e.g. "of Russia (AGMR)" matches; " (AGMR)" does not (no intervening words).
|
|
||||||
re_named_entity_abbrev = re.compile(r"^(?:\s+[^\s(]+){1,6}\s*\([A-Z]{2,}\)")
|
|
||||||
|
|
||||||
|
|
||||||
class LinkReplace(Exception):
|
class LinkReplace(Exception):
|
||||||
|
|
@ -78,62 +72,14 @@ re_cite = re.compile(
|
||||||
re.I | re.S,
|
re.I | re.S,
|
||||||
)
|
)
|
||||||
|
|
||||||
re_cite_template_start = re.compile(r"\{\{(?:cite|citation|short description|gli|defn)\b", re.I)
|
|
||||||
re_external_link = re.compile(r"\[https?://[^\]]+\]")
|
|
||||||
# Italic text (work titles in bibliographies). Handles apostrophes in content
|
|
||||||
# (e.g. ''It's fine'') but requires at least one non-apostrophe character so
|
|
||||||
# that ''' bold ''' is not consumed as italic.
|
|
||||||
re_italic = re.compile(r"''[^']+(?:'[^']+)*''")
|
|
||||||
# Bullet-point lines that contain a bare URL are unformatted bibliography entries.
|
|
||||||
re_bullet_with_url = re.compile(r"^\*[^\n]*https?://[^\s\n]+[^\n]*", re.MULTILINE)
|
|
||||||
|
|
||||||
|
|
||||||
def find_cite_template_spans(text: str) -> list[tuple[int, int]]:
|
|
||||||
"""Find (start, end) spans of {{Cite ...}} templates, handling nested braces."""
|
|
||||||
spans: list[tuple[int, int]] = []
|
|
||||||
for m in re_cite_template_start.finditer(text):
|
|
||||||
start = m.start()
|
|
||||||
if any(s <= start < e for s, e in spans):
|
|
||||||
continue # already inside a found span
|
|
||||||
depth = 0
|
|
||||||
i = start
|
|
||||||
while i < len(text):
|
|
||||||
if text[i : i + 2] == "{{":
|
|
||||||
depth += 1
|
|
||||||
i += 2
|
|
||||||
elif text[i : i + 2] == "}}":
|
|
||||||
depth -= 1
|
|
||||||
i += 2
|
|
||||||
if depth == 0:
|
|
||||||
spans.append((start, i))
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
i += 1
|
|
||||||
return spans
|
|
||||||
|
|
||||||
|
|
||||||
def parse_cite(text: str) -> typing.Iterator[tuple[str, str]]:
|
def parse_cite(text: str) -> typing.Iterator[tuple[str, str]]:
|
||||||
"""Parse citations yielding (type, chunk) tuples, skipping ref tags, cite templates, and external links."""
|
"""Parse a citation template."""
|
||||||
regions = [(m.start(), m.end()) for m in re_cite.finditer(text)]
|
|
||||||
regions.extend(find_cite_template_spans(text))
|
|
||||||
regions.extend((m.start(), m.end()) for m in re_external_link.finditer(text))
|
|
||||||
regions.extend((m.start(), m.end()) for m in re_italic.finditer(text))
|
|
||||||
regions.extend((m.start(), m.end()) for m in re_bullet_with_url.finditer(text))
|
|
||||||
regions.sort()
|
|
||||||
|
|
||||||
# Merge overlapping regions (e.g. a {{Cite}} that sits inside a <ref>)
|
|
||||||
merged: list[tuple[int, int]] = []
|
|
||||||
for start, end in regions:
|
|
||||||
if merged and start < merged[-1][1]:
|
|
||||||
merged[-1] = (merged[-1][0], max(merged[-1][1], end))
|
|
||||||
else:
|
|
||||||
merged.append((start, end))
|
|
||||||
|
|
||||||
prev = 0
|
prev = 0
|
||||||
for start, end in merged:
|
for m in re_cite.finditer(text):
|
||||||
yield ("text", text[prev:start])
|
yield ("text", text[prev : m.start()])
|
||||||
yield ("cite", text[start:end])
|
yield ("cite", m.group(0))
|
||||||
prev = end
|
prev = m.end()
|
||||||
yield ("text", text[prev:])
|
yield ("text", text[prev:])
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -217,8 +163,6 @@ def parse_links(text: str) -> typing.Iterator[tuple[str, str]]:
|
||||||
for prefix in ("file:", "image:")
|
for prefix in ("file:", "image:")
|
||||||
):
|
):
|
||||||
yield ("image", m.group(0))
|
yield ("image", m.group(0))
|
||||||
elif m.group().lower().startswith("[[category:"):
|
|
||||||
yield ("category", m.group(0))
|
|
||||||
else:
|
else:
|
||||||
yield ("link", m.group(0))
|
yield ("link", m.group(0))
|
||||||
prev = m.end()
|
prev = m.end()
|
||||||
|
|
@ -242,66 +186,12 @@ def mk_link_matcher(q: str) -> typing.Callable[[str], re.Match[str] | None]:
|
||||||
|
|
||||||
def add_link(m: re.Match[str], replacement: str, text: str) -> str:
|
def add_link(m: re.Match[str], replacement: str, text: str) -> str:
|
||||||
"""Add link to text."""
|
"""Add link to text."""
|
||||||
|
|
||||||
matched_text = m.group(0)
|
matched_text = m.group(0)
|
||||||
if matched_text.startswith("[[") and matched_text.endswith("|"):
|
if matched_text.startswith("[[") and matched_text.endswith("|"):
|
||||||
return m.re.sub(lambda m: f"[[{replacement}|", text, count=1)
|
return m.re.sub(lambda m: f"[[{replacement}|", text, count=1)
|
||||||
|
else:
|
||||||
inner_bracket = matched_text.find("[[")
|
return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1)
|
||||||
if inner_bracket > 0:
|
|
||||||
prefix = matched_text[:inner_bracket].rstrip()
|
|
||||||
sep = matched_text[len(prefix) : inner_bracket]
|
|
||||||
suffix = matched_text[inner_bracket:]
|
|
||||||
link_dest = replacement.split("|")[0] if "|" in replacement else replacement
|
|
||||||
if text[m.end() : m.end() + 2] == "]]":
|
|
||||||
# The existing [[link]] is fully consumed by the match (its closing ]]
|
|
||||||
# immediately follows). Replace everything with a single clean link.
|
|
||||||
# e.g. "surface [[runoff (hydrology)|runoff]]" → "[[surface runoff]]"
|
|
||||||
return text[: m.start()] + f"[[{link_dest}]]" + text[m.end() + 2 :]
|
|
||||||
# The existing link is only partially consumed; keep it and link just the prefix.
|
|
||||||
# e.g. "cross-platform [[interchange station]]"
|
|
||||||
# → "[[cross-platform interchange|cross-platform]] [[interchange station]]"
|
|
||||||
return text[: m.start()] + f"[[{link_dest}|{prefix}]]{sep}{suffix}" + text[m.end() :]
|
|
||||||
|
|
||||||
close_bracket = matched_text.find("]]")
|
|
||||||
if close_bracket > 0 and m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[":
|
|
||||||
# Match started inside an existing [[link]] and spans beyond its closing ]].
|
|
||||||
# e.g. "[[anti-globalization]] movement" matched as "anti-globalization]] movement"
|
|
||||||
# → absorb the leading [[ and produce "[[anti-globalization movement]]"
|
|
||||||
return text[: m.start() - 2] + f"[[{replacement}]]" + text[m.end() :]
|
|
||||||
|
|
||||||
return m.re.sub(lambda m: f"[[{replacement}]]", text, count=1)
|
|
||||||
|
|
||||||
|
|
||||||
def match_spans_existing_link(m: re.Match[str], text: str) -> bool:
|
|
||||||
"""Return True if the match starts partway inside an existing [[link]].
|
|
||||||
|
|
||||||
The trans2 space pattern allows matching across ]] boundaries, which is
|
|
||||||
intentional for cases like [[anti-globalization]] movement (match starts
|
|
||||||
right after [[). But when the match starts *inside* a link (e.g.
|
|
||||||
[[impervious surface]] runoff matching 'surface runoff'), absorbing the
|
|
||||||
brackets would produce broken wikitext — skip those matches instead.
|
|
||||||
"""
|
|
||||||
close_bracket = m.group(0).find("]]")
|
|
||||||
if close_bracket <= 0:
|
|
||||||
return False
|
|
||||||
# close_bracket > 0: match spans ][. If it starts right after [[ that
|
|
||||||
# opening bracket is absorbed by add_link — not our problem here.
|
|
||||||
if m.start() >= 2 and text[m.start() - 2 : m.start()] == "[[":
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def is_part_of_named_entity(m: re.Match[str], text: str) -> bool:
|
|
||||||
"""Return True if the match is a title-case fragment of a longer named entity.
|
|
||||||
|
|
||||||
Detects the pattern: matched text (title case) + extra words + (ABBREVIATION).
|
|
||||||
e.g. "Anti-Globalization Movement" followed by "of Russia (AGMR)" → True.
|
|
||||||
But "Anti-Globalization Movement" followed directly by "(AGM)" → False,
|
|
||||||
since the match itself is the full entity name.
|
|
||||||
"""
|
|
||||||
if not m.group(0)[0].isupper():
|
|
||||||
return False
|
|
||||||
return bool(re_named_entity_abbrev.match(text[m.end() :]))
|
|
||||||
|
|
||||||
|
|
||||||
def find_link_in_chunk(
|
def find_link_in_chunk(
|
||||||
|
|
@ -362,9 +252,8 @@ def find_link_in_chunk(
|
||||||
if not replacement:
|
if not replacement:
|
||||||
if bad_link_match:
|
if bad_link_match:
|
||||||
raise LinkReplace
|
raise LinkReplace
|
||||||
masked = re_category_link.sub(lambda c: " " * len(c.group(0)), content)
|
m = search_for_link(content)
|
||||||
m = search_for_link(masked)
|
if m:
|
||||||
if m and not is_part_of_named_entity(m, content) and not match_spans_existing_link(m, content):
|
|
||||||
found_text_to_link = m.group(0)
|
found_text_to_link = m.group(0)
|
||||||
replacement = match_found(m, q, linkto)
|
replacement = match_found(m, q, linkto)
|
||||||
new_content = add_link(m, replacement, content)
|
new_content = add_link(m, replacement, content)
|
||||||
|
|
@ -453,9 +342,8 @@ def find_link_and_section(q: str, content: str, linkto: str | None = None):
|
||||||
if replacement:
|
if replacement:
|
||||||
text = new_text
|
text = new_text
|
||||||
else:
|
else:
|
||||||
masked = re_category_link.sub(lambda c: " " * len(c.group(0)), text)
|
m = search_for_link(text)
|
||||||
m = search_for_link(masked)
|
if m:
|
||||||
if m and not is_part_of_named_entity(m, text) and not match_spans_existing_link(m, text):
|
|
||||||
replacement = match_found(m, q, linkto)
|
replacement = match_found(m, q, linkto)
|
||||||
text = add_link(m, replacement, text)
|
text = add_link(m, replacement, text)
|
||||||
new_content += text
|
new_content += text
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,8 @@ import typing
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from . import mediawiki_oauth
|
from . import mediawiki_oauth
|
||||||
|
|
||||||
|
|
||||||
class APIError(Exception):
|
|
||||||
"""Unexpected response from the MediaWiki API."""
|
|
||||||
|
|
||||||
wiki_hostname = "en.wikipedia.org"
|
wiki_hostname = "en.wikipedia.org"
|
||||||
wiki_api_php = f"https://{wiki_hostname}/w/api.php"
|
wiki_api_php = f"https://{wiki_hostname}/w/api.php"
|
||||||
user_agent = "add-links/0.1"
|
user_agent = "add-links/0.1"
|
||||||
|
|
@ -33,13 +27,10 @@ def parse_page(enwiki: str) -> dict[str, Any]:
|
||||||
return parse
|
return parse
|
||||||
|
|
||||||
|
|
||||||
def call(params: dict[str, str | int], timeout: int = 4) -> dict[str, typing.Any]:
|
def call(params: dict[str, str | int]) -> dict[str, typing.Any]:
|
||||||
"""Make GET request to mediawiki API."""
|
"""Make GET request to mediawiki API."""
|
||||||
r = mediawiki_oauth.api_post_request(params, timeout=timeout)
|
data = mediawiki_oauth.api_post_request(params)
|
||||||
try:
|
return cast(dict[str, Any], data.json())
|
||||||
return cast(dict[str, Any], r.json())
|
|
||||||
except requests.exceptions.JSONDecodeError:
|
|
||||||
raise APIError(f"HTTP {r.status_code}: {r.text[:200]!r}")
|
|
||||||
|
|
||||||
|
|
||||||
def article_exists(title: str) -> bool:
|
def article_exists(title: str) -> bool:
|
||||||
|
|
@ -101,7 +92,7 @@ def edit_page(
|
||||||
"summary": summary,
|
"summary": summary,
|
||||||
"section": section,
|
"section": section,
|
||||||
}
|
}
|
||||||
ret = call(params, timeout=30)
|
ret = call(params)
|
||||||
if "edit" not in ret:
|
if "edit" not in ret:
|
||||||
print("params")
|
print("params")
|
||||||
pprint(params)
|
pprint(params)
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,6 @@ import requests
|
||||||
from flask import current_app, session
|
from flask import current_app, session
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth1Session
|
||||||
|
|
||||||
from .api import ua
|
|
||||||
|
|
||||||
wiki_hostname = "en.wikipedia.org"
|
wiki_hostname = "en.wikipedia.org"
|
||||||
api_url = f"https://{wiki_hostname}/w/api.php"
|
api_url = f"https://{wiki_hostname}/w/api.php"
|
||||||
|
|
||||||
|
|
@ -29,7 +27,7 @@ def get_edit_proxy() -> dict[str, str]:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def api_post_request(params: dict[str, str | int], timeout: int = 4) -> requests.Response:
|
def api_post_request(params: dict[str, str | int]) -> requests.Response:
|
||||||
"""HTTP Post using Oauth."""
|
"""HTTP Post using Oauth."""
|
||||||
app = current_app
|
app = current_app
|
||||||
# url = "https://www.wikidata.org/w/api.php"
|
# url = "https://www.wikidata.org/w/api.php"
|
||||||
|
|
@ -41,9 +39,8 @@ def api_post_request(params: dict[str, str | int], timeout: int = 4) -> requests
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
oauth.headers.update({"User-Agent": ua})
|
|
||||||
proxies = get_edit_proxy()
|
proxies = get_edit_proxy()
|
||||||
return oauth.post(api_url, data=params, timeout=timeout, proxies=proxies)
|
return oauth.post(api_url, data=params, timeout=4, proxies=proxies)
|
||||||
|
|
||||||
|
|
||||||
def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response:
|
def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response:
|
||||||
|
|
@ -60,7 +57,6 @@ def raw_request(params: typing.Mapping[str, str | int]) -> requests.Response:
|
||||||
resource_owner_key=session["owner_key"],
|
resource_owner_key=session["owner_key"],
|
||||||
resource_owner_secret=session["owner_secret"],
|
resource_owner_secret=session["owner_secret"],
|
||||||
)
|
)
|
||||||
oauth.headers.update({"User-Agent": ua})
|
|
||||||
proxies = get_edit_proxy()
|
proxies = get_edit_proxy()
|
||||||
return oauth.get(
|
return oauth.get(
|
||||||
api_url + "?" + urllib.parse.urlencode(params), timeout=4, proxies=proxies
|
api_url + "?" + urllib.parse.urlencode(params), timeout=4, proxies=proxies
|
||||||
|
|
|
||||||
35
web_view.py
35
web_view.py
|
|
@ -186,7 +186,6 @@ def start_oauth() -> Response:
|
||||||
request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate"
|
request_token_url = wiki_index_php + "?title=Special%3aOAuth%2finitiate"
|
||||||
|
|
||||||
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
|
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
|
||||||
oauth.headers.update({"User-Agent": api.ua})
|
|
||||||
fetch_response = oauth.fetch_request_token(request_token_url)
|
fetch_response = oauth.fetch_request_token(request_token_url)
|
||||||
|
|
||||||
flask.session["owner_key"] = fetch_response.get("oauth_token")
|
flask.session["owner_key"] = fetch_response.get("oauth_token")
|
||||||
|
|
@ -224,7 +223,7 @@ def oauth_callback() -> werkzeug.wrappers.response.Response:
|
||||||
resource_owner_secret=flask.session["owner_secret"],
|
resource_owner_secret=flask.session["owner_secret"],
|
||||||
verifier=verifier,
|
verifier=verifier,
|
||||||
)
|
)
|
||||||
oauth.headers.update({"User-Agent": api.ua})
|
|
||||||
oauth_tokens = oauth.fetch_access_token(access_token_url)
|
oauth_tokens = oauth.fetch_access_token(access_token_url)
|
||||||
flask.session["owner_key"] = oauth_tokens.get("oauth_token")
|
flask.session["owner_key"] = oauth_tokens.get("oauth_token")
|
||||||
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
flask.session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
||||||
|
|
@ -299,37 +298,27 @@ def get_best_hit(title: str, hits: list[Hit]) -> tuple[Hit, dict[str, typing.Any
|
||||||
except NoMatch:
|
except NoMatch:
|
||||||
print("no match")
|
print("no match")
|
||||||
continue
|
continue
|
||||||
except api.MediawikiError as e:
|
|
||||||
print(f"MediawikiError for {hit['title']!r}: {e}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
return (hit, found)
|
return (hit, found)
|
||||||
|
|
||||||
raise NoGoodHit
|
raise NoGoodHit
|
||||||
|
|
||||||
|
|
||||||
def handle_post(url_title: str) -> Response:
|
|
||||||
"""Handle POST request."""
|
|
||||||
from_title = url_title.replace("_", " ").strip()
|
|
||||||
hit_title = flask.request.form["hit"]
|
|
||||||
try:
|
|
||||||
do_save(from_title, hit_title)
|
|
||||||
except mediawiki_oauth.LoginNeeded:
|
|
||||||
return flask.redirect(flask.url_for("start_oauth"))
|
|
||||||
except mediawiki_api.APIError as e:
|
|
||||||
return flask.make_response(f"Save failed: {e}", 502)
|
|
||||||
return flask.redirect(
|
|
||||||
flask.url_for("article_page", url_title=url_title, after=hit_title)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/link/<path:url_title>", methods=["GET", "POST"])
|
@app.route("/link/<path:url_title>", methods=["GET", "POST"])
|
||||||
def article_page(url_title: str) -> str | Response:
|
def article_page(url_title: str) -> str | Response:
|
||||||
"""Article page."""
|
"""Article page."""
|
||||||
if flask.request.method == "POST":
|
|
||||||
return handle_post(url_title)
|
|
||||||
|
|
||||||
from_title = url_title.replace("_", " ").strip()
|
from_title = url_title.replace("_", " ").strip()
|
||||||
|
|
||||||
|
if flask.request.method == "POST":
|
||||||
|
hit_title = flask.request.form["hit"]
|
||||||
|
try:
|
||||||
|
do_save(from_title, hit_title)
|
||||||
|
except mediawiki_oauth.LoginNeeded:
|
||||||
|
return flask.redirect(flask.url_for("start_oauth"))
|
||||||
|
return flask.redirect(
|
||||||
|
flask.url_for("article_page", url_title=url_title, after=hit_title)
|
||||||
|
)
|
||||||
|
|
||||||
article_title = flask.request.args.get("title")
|
article_title = flask.request.args.get("title")
|
||||||
|
|
||||||
total = search_count(from_title)
|
total = search_count(from_title)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue