Large categories like "Living people" (900k+ articles) were impractical because the code tried to download all members before displaying results. Now stops after collecting ~200 articles and provides a "Next page" link. Also fixes the MediaWiki API continuation protocol: passes the full continue dict (not just gcmcontinue) so imcontinue responses are handled properly, and reduces gcmlimit from "max" to 50 so each batch's images fit in one API response. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
873 lines
26 KiB
Python
Executable file
873 lines
26 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""Find photos on flickr for Wikipedia articles and contact the photographer."""
|
|
|
|
import collections
|
|
import dataclasses
|
|
import inspect
|
|
import json
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import typing
|
|
from urllib.parse import quote, unquote
|
|
|
|
import flask
|
|
import requests
|
|
import werkzeug
|
|
from sqlalchemy import func
|
|
from werkzeug.debug.tbtools import DebugTraceback
|
|
|
|
from flickr_mail.database import get_session
|
|
from flickr_mail.models import FlickrUpload, InteractionLog, SentMessage, ThumbnailCache
|
|
from flickr_mail.url_utils import extract_urls_from_message, normalize_flickr_url
|
|
|
|
import re
|
|
|
|
|
|
app = flask.Flask(__name__)
|
|
app.debug = False
|
|
|
|
enwiki = "en.wikipedia.org/wiki/"
|
|
|
|
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
|
|
RECENT_UPLOADS_COUNT = 24
|
|
|
|
# User agent for Wikimedia API requests
|
|
WIKIMEDIA_USER_AGENT = (
|
|
"FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
|
|
)
|
|
|
|
WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php"
|
|
|
|
# Browser-like headers for Flickr requests
|
|
BROWSER_HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": "none",
|
|
"Sec-Fetch-User": "?1",
|
|
"Cache-Control": "max-age=0",
|
|
}
|
|
|
|
|
|
# Flickr license codes to human-readable names
|
|
FLICKR_LICENSES = {
|
|
0: "All Rights Reserved",
|
|
1: "CC BY-NC-SA 2.0",
|
|
2: "CC BY-NC 2.0",
|
|
3: "CC BY-NC-ND 2.0",
|
|
4: "CC BY 2.0",
|
|
5: "CC BY-SA 2.0",
|
|
6: "CC BY-ND 2.0",
|
|
7: "No known copyright",
|
|
8: "US Government",
|
|
9: "CC0",
|
|
10: "Public Domain",
|
|
# CC 4.0 licenses (codes confirmed via Flickr)
|
|
16: "CC BY-NC-ND 4.0",
|
|
}
|
|
|
|
# Non-free CC licenses (NC or ND restrictions)
|
|
NONFREE_CC_LICENSES = {1, 2, 3, 6, 11, 12, 13, 16}
|
|
|
|
# Wikipedia-compatible free licenses
|
|
FREE_LICENSES = {4, 5, 7, 8, 9, 10, 14, 15}
|
|
|
|
|
|
PHOTOS_PER_PAGE = 25
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class FlickrPhoto:
|
|
"""Represents a Flickr photo from search results."""
|
|
|
|
id: str
|
|
title: str
|
|
path_alias: str
|
|
owner_nsid: str
|
|
username: str
|
|
realname: str
|
|
license: int
|
|
thumb_url: str
|
|
medium_url: str
|
|
|
|
@property
|
|
def flickr_url(self) -> str:
|
|
"""URL to the photo page on Flickr."""
|
|
return f"https://flickr.com/photos/{self.path_alias}/{self.id}"
|
|
|
|
@property
|
|
def license_name(self) -> str:
|
|
"""Human-readable license name."""
|
|
return FLICKR_LICENSES.get(self.license, f"License {self.license}")
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SearchResult:
|
|
"""Flickr search results with pagination metadata."""
|
|
|
|
photos: list[FlickrPhoto]
|
|
total_photos: int
|
|
current_page: int
|
|
total_pages: int
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CommonsUpload:
|
|
"""Represents a recent upload to Wikimedia Commons."""
|
|
|
|
title: str
|
|
thumb_url: str
|
|
commons_url: str
|
|
flickr_url: str
|
|
creator: str
|
|
timestamp: str
|
|
wikipedia_url: str = ""
|
|
creator_profile_url: str = ""
|
|
|
|
@property
|
|
def is_wikidata_item(self) -> bool:
|
|
"""Check if the wikipedia_url is actually a Wikidata item."""
|
|
if not self.wikipedia_url:
|
|
return False
|
|
# Match Q followed by digits at the end of the URL
|
|
return bool(re.search(r"/Q\d+$", self.wikipedia_url))
|
|
|
|
@property
|
|
def wiki_link_url(self) -> str:
|
|
"""Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia)."""
|
|
if not self.wikipedia_url:
|
|
return ""
|
|
if self.is_wikidata_item:
|
|
# Extract Q-id and build Wikidata URL
|
|
match = re.search(r"(Q\d+)$", self.wikipedia_url)
|
|
if match:
|
|
return f"https://www.wikidata.org/wiki/{match.group(1)}"
|
|
return self.wikipedia_url
|
|
|
|
@property
|
|
def wiki_link_label(self) -> str:
|
|
"""Get the label for the wiki link."""
|
|
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
|
|
|
|
|
|
|
|
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
|
|
"""Fetch thumbnail URLs from Commons API for the given file titles."""
|
|
if not titles:
|
|
return {}
|
|
|
|
# Commons API allows up to 50 titles per request
|
|
params = {
|
|
"action": "query",
|
|
"titles": "|".join(titles),
|
|
"prop": "imageinfo",
|
|
"iiprop": "url",
|
|
"iiurlwidth": 150,
|
|
"format": "json",
|
|
}
|
|
|
|
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
|
|
|
|
try:
|
|
response = requests.get(
|
|
"https://commons.wikimedia.org/w/api.php",
|
|
params=params,
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except (requests.RequestException, json.JSONDecodeError):
|
|
return {}
|
|
|
|
thumbnails: dict[str, str] = {}
|
|
pages = data.get("query", {}).get("pages", {})
|
|
for page in pages.values():
|
|
title = page.get("title", "")
|
|
imageinfo = page.get("imageinfo", [])
|
|
if imageinfo:
|
|
thumb_url = imageinfo[0].get("thumburl", "")
|
|
if thumb_url:
|
|
thumbnails[title] = thumb_url
|
|
|
|
return thumbnails
|
|
|
|
|
|
def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]:
|
|
"""Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail.
|
|
|
|
Returns a tuple of (uploads_list, total_count) where total_count is the total number
|
|
of uploads obtained via Flickr mail (not just the ones returned).
|
|
"""
|
|
session = get_session()
|
|
try:
|
|
query = (
|
|
session.query(FlickrUpload, SentMessage)
|
|
.join(SentMessage)
|
|
.order_by(FlickrUpload.timestamp.desc())
|
|
)
|
|
total_matched = query.count()
|
|
if total_matched == 0:
|
|
return [], 0
|
|
|
|
recent = query.limit(RECENT_UPLOADS_COUNT).all()
|
|
|
|
# Get thumbnails from cache
|
|
titles = [upload.title for upload, msg in recent]
|
|
now = int(time.time())
|
|
cached = {
|
|
tc.title: tc
|
|
for tc in session.query(ThumbnailCache)
|
|
.filter(ThumbnailCache.title.in_(titles))
|
|
.all()
|
|
}
|
|
|
|
# Find titles needing fetch (missing or expired)
|
|
titles_to_fetch = [
|
|
t for t in titles
|
|
if t not in cached or (now - (cached[t].fetched_at or 0)) > COMMONS_CACHE_MAX_AGE
|
|
]
|
|
|
|
if titles_to_fetch:
|
|
new_thumbs = fetch_commons_thumbnails(titles_to_fetch)
|
|
for title, thumb_url in new_thumbs.items():
|
|
existing = cached.get(title)
|
|
if existing:
|
|
existing.thumb_url = thumb_url
|
|
existing.fetched_at = now
|
|
else:
|
|
tc = ThumbnailCache(title=title, thumb_url=thumb_url, fetched_at=now)
|
|
session.add(tc)
|
|
cached[title] = tc
|
|
session.commit()
|
|
|
|
result: list[CommonsUpload] = []
|
|
for upload, msg in recent:
|
|
thumb_url = cached[upload.title].thumb_url if upload.title in cached else ""
|
|
if not thumb_url:
|
|
continue
|
|
|
|
commons_url = f"https://commons.wikimedia.org/wiki/{upload.title.replace(' ', '_')}"
|
|
|
|
result.append(
|
|
CommonsUpload(
|
|
title=upload.title.replace("File:", "").rsplit(".", 1)[0],
|
|
thumb_url=thumb_url,
|
|
commons_url=commons_url,
|
|
flickr_url=upload.flickr_url or "",
|
|
creator=upload.creator or "Unknown",
|
|
timestamp=(upload.timestamp or "")[:10],
|
|
wikipedia_url=upload.wikipedia_url or "",
|
|
creator_profile_url=upload.creator_profile_url or "",
|
|
)
|
|
)
|
|
|
|
return result, total_matched
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]:
|
|
"""Get previous messages sent to a Flickr user.
|
|
|
|
Checks both the display name (flickr_user) and username (flickr_username)
|
|
against the recipient field in the messages index.
|
|
"""
|
|
names = set()
|
|
if flickr_user:
|
|
names.add(flickr_user.lower())
|
|
if flickr_username:
|
|
names.add(flickr_username.lower())
|
|
if not names:
|
|
return []
|
|
|
|
session = get_session()
|
|
try:
|
|
messages = (
|
|
session.query(SentMessage)
|
|
.filter(func.lower(SentMessage.recipient).in_(names))
|
|
.all()
|
|
)
|
|
return [
|
|
{
|
|
"message_id": m.message_id,
|
|
"subject": m.subject,
|
|
"url": m.url,
|
|
"recipient": m.recipient,
|
|
"date": m.date,
|
|
}
|
|
for m in messages
|
|
]
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def parse_category_input(category_input: str) -> str | None:
|
|
"""Parse category title from URL or direct input.
|
|
|
|
Returns the category title with 'Category:' prefix, or None if invalid.
|
|
"""
|
|
category_input = category_input.strip()
|
|
|
|
# Handle URL format: https://en.wikipedia.org/wiki/Category:Example
|
|
if "wikipedia.org" in category_input:
|
|
match = re.search(r"/wiki/(Category:[^#?]+)", category_input)
|
|
if match:
|
|
return unquote(match.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
# Handle direct input - add Category: prefix if missing
|
|
if category_input.startswith("Category:"):
|
|
return category_input.replace("_", " ")
|
|
|
|
# Assume it's just the category name
|
|
return f"Category:{category_input.replace('_', ' ')}"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ArticleWithoutImage:
|
|
"""Represents a Wikipedia article that needs an image."""
|
|
|
|
title: str
|
|
pageid: int
|
|
|
|
@property
|
|
def wikipedia_url(self) -> str:
|
|
"""URL to the Wikipedia article."""
|
|
return f"https://en.wikipedia.org/wiki/{self.title.replace(' ', '_')}"
|
|
|
|
@property
|
|
def search_url(self) -> str:
|
|
"""URL to search for this article in Flickr Mail."""
|
|
return f"/?enwp={quote(self.title)}"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CategoryResult:
|
|
"""Result of a paginated category search."""
|
|
|
|
articles: list[ArticleWithoutImage]
|
|
gcmcontinue: str | None
|
|
|
|
|
|
# Common non-content images to ignore when checking if an article has images
|
|
NON_CONTENT_IMAGE_PATTERNS = [
|
|
"OOjs UI icon",
|
|
"Commons-logo",
|
|
"Symbol ",
|
|
"Edit-ltr",
|
|
"Ambox ",
|
|
"Question book",
|
|
"Wiki letter",
|
|
"Text document",
|
|
"Folder ",
|
|
"Crystal ",
|
|
"Nuvola ",
|
|
"Gnome-",
|
|
"Disambig ",
|
|
"DAB ",
|
|
]
|
|
|
|
|
|
def has_content_image(images: list[dict]) -> bool:
|
|
"""Check if an article has a content image (not just UI icons/logos)."""
|
|
for img in images:
|
|
title = img.get("title", "")
|
|
# Skip if it matches any non-content pattern
|
|
is_non_content = any(pattern in title for pattern in NON_CONTENT_IMAGE_PATTERNS)
|
|
if not is_non_content:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_articles_without_images(
|
|
category: str,
|
|
limit: int = 200,
|
|
gcmcontinue: str | None = None,
|
|
) -> CategoryResult:
|
|
"""Get articles in a category that don't have images.
|
|
|
|
Uses generator=categorymembers with prop=images to efficiently check
|
|
multiple articles in a single API request, following continuation until
|
|
the limit is reached or all category members have been processed.
|
|
"""
|
|
params = {
|
|
"action": "query",
|
|
"generator": "categorymembers",
|
|
"gcmtitle": category,
|
|
"gcmtype": "page", # Only articles, not subcategories or files
|
|
"gcmnamespace": "0", # Main namespace only
|
|
"gcmlimit": "50", # Small batches so images fit in one response
|
|
"prop": "images",
|
|
"imlimit": "max",
|
|
"format": "json",
|
|
}
|
|
|
|
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
|
|
articles_without_images: list[ArticleWithoutImage] = []
|
|
seen_pageids: set[int] = set()
|
|
next_gcmcontinue: str | None = None
|
|
|
|
# Build initial continue params from the external pagination token
|
|
continue_params: dict[str, str] = {}
|
|
if gcmcontinue:
|
|
continue_params = {"gcmcontinue": gcmcontinue, "continue": "gcmcontinue||"}
|
|
|
|
while True:
|
|
request_params = params.copy()
|
|
request_params.update(continue_params)
|
|
|
|
try:
|
|
response = requests.get(
|
|
WIKIPEDIA_API, params=request_params, headers=headers, timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except (requests.RequestException, json.JSONDecodeError) as e:
|
|
print(f"Wikipedia API error: {e}")
|
|
break
|
|
|
|
pages = data.get("query", {}).get("pages", {})
|
|
for page in pages.values():
|
|
pageid = page.get("pageid", 0)
|
|
if not pageid or pageid in seen_pageids:
|
|
continue
|
|
seen_pageids.add(pageid)
|
|
|
|
images = page.get("images", [])
|
|
|
|
# Skip if page has content images (not just UI icons)
|
|
if has_content_image(images):
|
|
continue
|
|
|
|
title = page.get("title", "")
|
|
if title:
|
|
articles_without_images.append(
|
|
ArticleWithoutImage(title=title, pageid=pageid)
|
|
)
|
|
|
|
api_continue = data.get("continue")
|
|
if not api_continue:
|
|
break
|
|
|
|
# Only stop at generator boundaries where we have a resumable token
|
|
gcmc = api_continue.get("gcmcontinue")
|
|
if gcmc and len(articles_without_images) >= limit:
|
|
next_gcmcontinue = gcmc
|
|
break
|
|
|
|
continue_params = api_continue
|
|
|
|
# Sort by title for consistent display
|
|
articles_without_images.sort(key=lambda a: a.title)
|
|
return CategoryResult(
|
|
articles=articles_without_images,
|
|
gcmcontinue=next_gcmcontinue,
|
|
)
|
|
|
|
|
|
def is_valid_flickr_image_url(url: str) -> bool:
|
|
"""Check if URL is a valid Flickr static image URL."""
|
|
valid_prefixes = (
|
|
"https://live.staticflickr.com/",
|
|
"https://farm", # farm1.staticflickr.com, farm2.staticflickr.com, etc.
|
|
"https://c1.staticflickr.com/",
|
|
"https://c2.staticflickr.com/",
|
|
)
|
|
if not url.startswith(valid_prefixes):
|
|
return False
|
|
# For farm URLs, verify the domain pattern
|
|
if url.startswith("https://farm"):
|
|
if ".staticflickr.com/" not in url:
|
|
return False
|
|
return True
|
|
|
|
|
|
def search_flickr(search_term: str, page: int = 1) -> SearchResult:
|
|
"""Search Flickr for photos matching the search term."""
|
|
encoded_term = quote(f'"{search_term}"')
|
|
url = f"https://flickr.com/search/?view_all=1&text={encoded_term}&page={page}"
|
|
|
|
response = requests.get(url, headers=BROWSER_HEADERS)
|
|
response.raise_for_status()
|
|
|
|
return parse_flickr_search_results(response.text, page)
|
|
|
|
|
|
def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult:
|
|
"""Parse Flickr search results HTML and extract photo data."""
|
|
empty_result = SearchResult(
|
|
photos=[], total_photos=0, current_page=page, total_pages=0
|
|
)
|
|
|
|
# Find the modelExport JSON embedded in the page
|
|
start = html.find("modelExport:")
|
|
if start == -1:
|
|
return empty_result
|
|
|
|
start += len("modelExport:")
|
|
while html[start].isspace():
|
|
start += 1
|
|
|
|
# Parse the JSON by counting braces
|
|
brace_count = 0
|
|
i = start
|
|
in_string = False
|
|
escape_next = False
|
|
|
|
while i < len(html):
|
|
char = html[i]
|
|
|
|
if escape_next:
|
|
escape_next = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == "\\" and in_string:
|
|
escape_next = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"' and not escape_next:
|
|
in_string = not in_string
|
|
elif not in_string:
|
|
if char == "{":
|
|
brace_count += 1
|
|
elif char == "}":
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
json_str = html[start : i + 1]
|
|
break
|
|
i += 1
|
|
else:
|
|
return empty_result
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except json.JSONDecodeError:
|
|
return empty_result
|
|
|
|
# Extract photos from the parsed data
|
|
photos: list[FlickrPhoto] = []
|
|
|
|
main = data.get("main", {})
|
|
photos_models = main.get("search-photos-lite-models", [])
|
|
|
|
if not photos_models:
|
|
return empty_result
|
|
|
|
model_data = photos_models[0].get("data", {})
|
|
photos_container = model_data.get("photos", {}).get("data", {})
|
|
photos_data = photos_container.get("_data", [])
|
|
total_photos = photos_container.get("totalItems", 0)
|
|
|
|
# Calculate total pages (Flickr caps at 4000 results)
|
|
total_pages = min(total_photos, 4000) // PHOTOS_PER_PAGE
|
|
if min(total_photos, 4000) % PHOTOS_PER_PAGE:
|
|
total_pages += 1
|
|
|
|
for photo_entry in photos_data:
|
|
# Skip None entries (placeholders from pagination)
|
|
if photo_entry is None:
|
|
continue
|
|
|
|
pd = photo_entry.get("data", {})
|
|
if not pd:
|
|
continue
|
|
|
|
sizes = pd.get("sizes", {}).get("data", {})
|
|
thumb_data = sizes.get("q", sizes.get("sq", {})).get("data", {})
|
|
medium_data = sizes.get("n", sizes.get("m", {})).get("data", {})
|
|
|
|
thumb_url = thumb_data.get("url", "")
|
|
medium_url = medium_data.get("url", "")
|
|
|
|
# Ensure URLs have protocol
|
|
if thumb_url.startswith("//"):
|
|
thumb_url = "https:" + thumb_url
|
|
if medium_url.startswith("//"):
|
|
medium_url = "https:" + medium_url
|
|
|
|
photos.append(
|
|
FlickrPhoto(
|
|
id=str(pd.get("id", "")),
|
|
title=pd.get("title", ""),
|
|
path_alias=pd.get("pathAlias", ""),
|
|
owner_nsid=pd.get("ownerNsid", ""),
|
|
username=pd.get("username", ""),
|
|
realname=pd.get("realname", ""),
|
|
license=pd.get("license", 0),
|
|
thumb_url=thumb_url,
|
|
medium_url=medium_url,
|
|
)
|
|
)
|
|
|
|
return SearchResult(
|
|
photos=photos,
|
|
total_photos=total_photos,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
)
|
|
|
|
|
|
def log_interaction(
|
|
interaction_type: str,
|
|
query: str | None = None,
|
|
flickr_url: str | None = None,
|
|
wikipedia_url: str | None = None,
|
|
) -> None:
|
|
"""Log a user interaction to the database."""
|
|
forwarded_for = flask.request.headers.get("X-Forwarded-For")
|
|
ip_address = forwarded_for.split(",")[0].strip() if forwarded_for else flask.request.remote_addr
|
|
user_agent = flask.request.headers.get("User-Agent")
|
|
session = get_session()
|
|
try:
|
|
entry = InteractionLog(
|
|
timestamp=int(time.time()),
|
|
interaction_type=interaction_type,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
query=query,
|
|
flickr_url=flickr_url,
|
|
wikipedia_url=wikipedia_url,
|
|
)
|
|
session.add(entry)
|
|
session.commit()
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
|
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
|
"""Handle exception."""
|
|
exec_type, exc_value, current_traceback = sys.exc_info()
|
|
assert exc_value
|
|
tb = DebugTraceback(exc_value)
|
|
|
|
summary = tb.render_traceback_html(include_title=False)
|
|
exc_lines = "".join(tb._te.format_exception_only())
|
|
|
|
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
|
last_frame_args = inspect.getargs(last_frame.f_code)
|
|
|
|
return (
|
|
flask.render_template(
|
|
"show_error.html",
|
|
plaintext=tb.render_traceback_text(),
|
|
exception=exc_lines,
|
|
exception_type=tb._te.exc_type.__name__,
|
|
summary=summary,
|
|
last_frame=last_frame,
|
|
last_frame_args=last_frame_args,
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
def start() -> str:
|
|
"""Start form."""
|
|
enwp = flask.request.args.get("enwp")
|
|
if not enwp:
|
|
recent_uploads, total_uploads = get_recent_commons_uploads()
|
|
return flask.render_template(
|
|
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
|
|
)
|
|
enwp = enwp.strip()
|
|
if not enwp:
|
|
recent_uploads, total_uploads = get_recent_commons_uploads()
|
|
return flask.render_template(
|
|
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
|
|
)
|
|
|
|
input_is = "url" if enwiki in enwp else "title"
|
|
|
|
wikipedia_url: str
|
|
wiki_part1: str
|
|
wiki_part2: str
|
|
if input_is == "url":
|
|
start = enwp.find(enwiki) + len(enwiki)
|
|
wiki_part2 = unquote(enwp[start:])
|
|
name = wiki_part2
|
|
wiki_part1 = enwp[:start]
|
|
wikipedia_url = enwp
|
|
else:
|
|
name = enwp
|
|
wiki_part1 = "https://" + enwiki
|
|
wiki_part2 = name.replace(" ", "_")
|
|
wikipedia_url = wiki_part1 + wiki_part2
|
|
|
|
# Remove disambiguation suffix like "(academic)" for Flickr search
|
|
name = name.replace("_", " ")
|
|
if " (" in name:
|
|
name = name[: name.find(" (")]
|
|
|
|
# Get category param if coming from category search
|
|
cat = flask.request.args.get("cat")
|
|
|
|
flickr_url = flask.request.args.get("flickr")
|
|
if not flickr_url:
|
|
# Search Flickr for photos
|
|
page = flask.request.args.get("page", 1, type=int)
|
|
page = max(1, page) # Ensure page is at least 1
|
|
if page == 1:
|
|
log_interaction("search_article", query=name, wikipedia_url=wikipedia_url)
|
|
search_result = search_flickr(name, page)
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
search_result=search_result,
|
|
cat=cat,
|
|
)
|
|
|
|
if "/in/" in flickr_url:
|
|
flickr_url = flickr_url[: flickr_url.find("/in/")]
|
|
|
|
flickr_start = "https://flickr.com/photos/"
|
|
|
|
assert flickr_url.startswith(flickr_start)
|
|
flickr_username = flickr_url[
|
|
len(flickr_start) : flickr_url.find("/", len(flickr_start))
|
|
]
|
|
|
|
nsid = flickr_usrename_to_nsid(flickr_username)
|
|
assert nsid
|
|
print(nsid)
|
|
|
|
# Get optional image URL for display, validate it's from Flickr
|
|
img_url = flask.request.args.get("img")
|
|
if img_url and not is_valid_flickr_image_url(img_url):
|
|
img_url = None
|
|
|
|
# Get flickr_user name and build profile URL
|
|
flickr_user = flask.request.args.get("flickr_user", "")
|
|
flickr_user_url = f"https://www.flickr.com/photos/{flickr_username}/"
|
|
|
|
# Check for previous messages to this user
|
|
previous_messages = get_previous_messages(flickr_user, flickr_username)
|
|
|
|
# Get license code if provided
|
|
license_code = flask.request.args.get("license", type=int)
|
|
license_name = (
|
|
FLICKR_LICENSES.get(license_code, "") if license_code is not None else ""
|
|
)
|
|
|
|
is_free_license = license_code in FREE_LICENSES
|
|
is_nonfree_cc = license_code in NONFREE_CC_LICENSES
|
|
|
|
# For free licenses, show upload options instead of message
|
|
if is_free_license:
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
flickr_url=flickr_url,
|
|
img_url=img_url,
|
|
license_code=license_code,
|
|
license_name=license_name,
|
|
is_free_license=True,
|
|
wikipedia_url=wikipedia_url,
|
|
flickr_user=flickr_user,
|
|
flickr_user_url=flickr_user_url,
|
|
cat=cat,
|
|
previous_messages=previous_messages,
|
|
)
|
|
|
|
log_interaction(
|
|
"generate_message",
|
|
query=name,
|
|
flickr_url=flickr_url,
|
|
wikipedia_url=wikipedia_url,
|
|
)
|
|
|
|
msg = flask.render_template(
|
|
"message.jinja",
|
|
flickr_url=flickr_url,
|
|
enwp=enwp,
|
|
wikipedia_url=wikipedia_url,
|
|
name=name,
|
|
wiki_part1=wiki_part1,
|
|
wiki_part2=wiki_part2,
|
|
is_nonfree_cc=is_nonfree_cc,
|
|
license_name=license_name,
|
|
)
|
|
|
|
subject = f"Request to use your photo of {name} on Wikipedia"
|
|
|
|
lines = msg.split("\n\n")
|
|
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
flickr_url=flickr_url,
|
|
subject=subject,
|
|
lines=lines,
|
|
nsid=nsid,
|
|
img_url=img_url,
|
|
license_code=license_code,
|
|
license_name=license_name,
|
|
flickr_user=flickr_user,
|
|
flickr_user_url=flickr_user_url,
|
|
cat=cat,
|
|
previous_messages=previous_messages,
|
|
)
|
|
|
|
|
|
@app.route("/category")
|
|
def category_search() -> str:
|
|
"""Find articles in a Wikipedia category that need images."""
|
|
cat = flask.request.args.get("cat", "").strip()
|
|
|
|
if not cat:
|
|
return flask.render_template("category.html")
|
|
|
|
category = parse_category_input(cat)
|
|
if not category:
|
|
return flask.render_template(
|
|
"category.html",
|
|
error="Invalid category format. Please enter a category name or URL.",
|
|
cat=cat,
|
|
)
|
|
|
|
log_interaction("search_category", query=category)
|
|
gcmcontinue = flask.request.args.get("gcmcontinue") or None
|
|
result = get_articles_without_images(category, gcmcontinue=gcmcontinue)
|
|
|
|
# Get the display name (without Category: prefix)
|
|
category_name = category.replace("Category:", "")
|
|
|
|
return flask.render_template(
|
|
"category.html",
|
|
cat=cat,
|
|
category=category,
|
|
category_name=category_name,
|
|
articles=result.articles,
|
|
gcmcontinue=result.gcmcontinue,
|
|
)
|
|
|
|
|
|
def get_params(line_iter: collections.abc.Iterable[str]) -> str:
|
|
"""Find and return params from flickr profile page."""
|
|
look_for = 'params: {"isEditingTestimonial":false,'
|
|
return next(line[line.find("{") :] for line in line_iter if look_for in line)
|
|
|
|
|
|
def flickr_usrename_to_nsid(username: str) -> str:
|
|
"""Get NSID from flickr username."""
|
|
url = f"https://www.flickr.com/people/{username}/"
|
|
r = requests.get(url, headers=BROWSER_HEADERS)
|
|
params_str = get_params(r.text.splitlines())
|
|
params, _ = json.JSONDecoder().raw_decode(params_str)
|
|
return typing.cast(str, params["nsid"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0")
|