Log searches (article/category) and message-generation events to a new interaction_log table, capturing IP address and User-Agent. Also apply NOT NULL constraints to Contribution, SentMessage, FlickrUpload, and ThumbnailCache fields that are always populated, and remove stale continue_token references from category.html. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
840 lines
25 KiB
Python
Executable file
840 lines
25 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""Find photos on flickr for Wikipedia articles and contact the photographer."""
|
|
|
|
import collections
|
|
import dataclasses
|
|
import inspect
|
|
import json
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import typing
|
|
from urllib.parse import quote, unquote
|
|
|
|
import flask
|
|
import requests
|
|
import werkzeug
|
|
from sqlalchemy import func
|
|
from werkzeug.debug.tbtools import DebugTraceback
|
|
|
|
from flickr_mail.database import get_session
|
|
from flickr_mail.models import FlickrUpload, InteractionLog, SentMessage, ThumbnailCache
|
|
from flickr_mail.url_utils import extract_urls_from_message, normalize_flickr_url
|
|
|
|
import re
|
|
|
|
|
|
app = flask.Flask(__name__)
|
|
app.debug = False
|
|
|
|
enwiki = "en.wikipedia.org/wiki/"
|
|
|
|
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
|
|
RECENT_UPLOADS_COUNT = 24
|
|
|
|
# User agent for Wikimedia API requests
|
|
WIKIMEDIA_USER_AGENT = (
|
|
"FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
|
|
)
|
|
|
|
WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php"
|
|
|
|
# Browser-like headers for Flickr requests
|
|
BROWSER_HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": "none",
|
|
"Sec-Fetch-User": "?1",
|
|
"Cache-Control": "max-age=0",
|
|
}
|
|
|
|
|
|
# Flickr license codes to human-readable names
|
|
FLICKR_LICENSES = {
|
|
0: "All Rights Reserved",
|
|
1: "CC BY-NC-SA 2.0",
|
|
2: "CC BY-NC 2.0",
|
|
3: "CC BY-NC-ND 2.0",
|
|
4: "CC BY 2.0",
|
|
5: "CC BY-SA 2.0",
|
|
6: "CC BY-ND 2.0",
|
|
7: "No known copyright",
|
|
8: "US Government",
|
|
9: "CC0",
|
|
10: "Public Domain",
|
|
# CC 4.0 licenses (codes confirmed via Flickr)
|
|
16: "CC BY-NC-ND 4.0",
|
|
}
|
|
|
|
# Non-free CC licenses (NC or ND restrictions)
|
|
NONFREE_CC_LICENSES = {1, 2, 3, 6, 11, 12, 13, 16}
|
|
|
|
# Wikipedia-compatible free licenses
|
|
FREE_LICENSES = {4, 5, 7, 8, 9, 10, 14, 15}
|
|
|
|
|
|
PHOTOS_PER_PAGE = 25
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class FlickrPhoto:
|
|
"""Represents a Flickr photo from search results."""
|
|
|
|
id: str
|
|
title: str
|
|
path_alias: str
|
|
owner_nsid: str
|
|
username: str
|
|
realname: str
|
|
license: int
|
|
thumb_url: str
|
|
medium_url: str
|
|
|
|
@property
|
|
def flickr_url(self) -> str:
|
|
"""URL to the photo page on Flickr."""
|
|
return f"https://flickr.com/photos/{self.path_alias}/{self.id}"
|
|
|
|
@property
|
|
def license_name(self) -> str:
|
|
"""Human-readable license name."""
|
|
return FLICKR_LICENSES.get(self.license, f"License {self.license}")
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SearchResult:
|
|
"""Flickr search results with pagination metadata."""
|
|
|
|
photos: list[FlickrPhoto]
|
|
total_photos: int
|
|
current_page: int
|
|
total_pages: int
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CommonsUpload:
|
|
"""Represents a recent upload to Wikimedia Commons."""
|
|
|
|
title: str
|
|
thumb_url: str
|
|
commons_url: str
|
|
flickr_url: str
|
|
creator: str
|
|
timestamp: str
|
|
wikipedia_url: str = ""
|
|
creator_profile_url: str = ""
|
|
|
|
@property
|
|
def is_wikidata_item(self) -> bool:
|
|
"""Check if the wikipedia_url is actually a Wikidata item."""
|
|
if not self.wikipedia_url:
|
|
return False
|
|
# Match Q followed by digits at the end of the URL
|
|
return bool(re.search(r"/Q\d+$", self.wikipedia_url))
|
|
|
|
@property
|
|
def wiki_link_url(self) -> str:
|
|
"""Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia)."""
|
|
if not self.wikipedia_url:
|
|
return ""
|
|
if self.is_wikidata_item:
|
|
# Extract Q-id and build Wikidata URL
|
|
match = re.search(r"(Q\d+)$", self.wikipedia_url)
|
|
if match:
|
|
return f"https://www.wikidata.org/wiki/{match.group(1)}"
|
|
return self.wikipedia_url
|
|
|
|
@property
|
|
def wiki_link_label(self) -> str:
|
|
"""Get the label for the wiki link."""
|
|
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
|
|
|
|
|
|
|
|
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
|
|
"""Fetch thumbnail URLs from Commons API for the given file titles."""
|
|
if not titles:
|
|
return {}
|
|
|
|
# Commons API allows up to 50 titles per request
|
|
params = {
|
|
"action": "query",
|
|
"titles": "|".join(titles),
|
|
"prop": "imageinfo",
|
|
"iiprop": "url",
|
|
"iiurlwidth": 150,
|
|
"format": "json",
|
|
}
|
|
|
|
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
|
|
|
|
try:
|
|
response = requests.get(
|
|
"https://commons.wikimedia.org/w/api.php",
|
|
params=params,
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except (requests.RequestException, json.JSONDecodeError):
|
|
return {}
|
|
|
|
thumbnails: dict[str, str] = {}
|
|
pages = data.get("query", {}).get("pages", {})
|
|
for page in pages.values():
|
|
title = page.get("title", "")
|
|
imageinfo = page.get("imageinfo", [])
|
|
if imageinfo:
|
|
thumb_url = imageinfo[0].get("thumburl", "")
|
|
if thumb_url:
|
|
thumbnails[title] = thumb_url
|
|
|
|
return thumbnails
|
|
|
|
|
|
def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]:
|
|
"""Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail.
|
|
|
|
Returns a tuple of (uploads_list, total_count) where total_count is the total number
|
|
of uploads obtained via Flickr mail (not just the ones returned).
|
|
"""
|
|
session = get_session()
|
|
try:
|
|
query = (
|
|
session.query(FlickrUpload, SentMessage)
|
|
.join(SentMessage)
|
|
.order_by(FlickrUpload.timestamp.desc())
|
|
)
|
|
total_matched = query.count()
|
|
if total_matched == 0:
|
|
return [], 0
|
|
|
|
recent = query.limit(RECENT_UPLOADS_COUNT).all()
|
|
|
|
# Get thumbnails from cache
|
|
titles = [upload.title for upload, msg in recent]
|
|
now = int(time.time())
|
|
cached = {
|
|
tc.title: tc
|
|
for tc in session.query(ThumbnailCache)
|
|
.filter(ThumbnailCache.title.in_(titles))
|
|
.all()
|
|
}
|
|
|
|
# Find titles needing fetch (missing or expired)
|
|
titles_to_fetch = [
|
|
t for t in titles
|
|
if t not in cached or (now - (cached[t].fetched_at or 0)) > COMMONS_CACHE_MAX_AGE
|
|
]
|
|
|
|
if titles_to_fetch:
|
|
new_thumbs = fetch_commons_thumbnails(titles_to_fetch)
|
|
for title, thumb_url in new_thumbs.items():
|
|
existing = cached.get(title)
|
|
if existing:
|
|
existing.thumb_url = thumb_url
|
|
existing.fetched_at = now
|
|
else:
|
|
tc = ThumbnailCache(title=title, thumb_url=thumb_url, fetched_at=now)
|
|
session.add(tc)
|
|
cached[title] = tc
|
|
session.commit()
|
|
|
|
result: list[CommonsUpload] = []
|
|
for upload, msg in recent:
|
|
thumb_url = cached[upload.title].thumb_url if upload.title in cached else ""
|
|
if not thumb_url:
|
|
continue
|
|
|
|
commons_url = f"https://commons.wikimedia.org/wiki/{upload.title.replace(' ', '_')}"
|
|
|
|
result.append(
|
|
CommonsUpload(
|
|
title=upload.title.replace("File:", "").rsplit(".", 1)[0],
|
|
thumb_url=thumb_url,
|
|
commons_url=commons_url,
|
|
flickr_url=upload.flickr_url or "",
|
|
creator=upload.creator or "Unknown",
|
|
timestamp=(upload.timestamp or "")[:10],
|
|
wikipedia_url=upload.wikipedia_url or "",
|
|
creator_profile_url=upload.creator_profile_url or "",
|
|
)
|
|
)
|
|
|
|
return result, total_matched
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]:
|
|
"""Get previous messages sent to a Flickr user.
|
|
|
|
Checks both the display name (flickr_user) and username (flickr_username)
|
|
against the recipient field in the messages index.
|
|
"""
|
|
names = set()
|
|
if flickr_user:
|
|
names.add(flickr_user.lower())
|
|
if flickr_username:
|
|
names.add(flickr_username.lower())
|
|
if not names:
|
|
return []
|
|
|
|
session = get_session()
|
|
try:
|
|
messages = (
|
|
session.query(SentMessage)
|
|
.filter(func.lower(SentMessage.recipient).in_(names))
|
|
.all()
|
|
)
|
|
return [
|
|
{
|
|
"message_id": m.message_id,
|
|
"subject": m.subject,
|
|
"url": m.url,
|
|
"recipient": m.recipient,
|
|
"date": m.date,
|
|
}
|
|
for m in messages
|
|
]
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def parse_category_input(category_input: str) -> str | None:
|
|
"""Parse category title from URL or direct input.
|
|
|
|
Returns the category title with 'Category:' prefix, or None if invalid.
|
|
"""
|
|
category_input = category_input.strip()
|
|
|
|
# Handle URL format: https://en.wikipedia.org/wiki/Category:Example
|
|
if "wikipedia.org" in category_input:
|
|
match = re.search(r"/wiki/(Category:[^#?]+)", category_input)
|
|
if match:
|
|
return unquote(match.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
# Handle direct input - add Category: prefix if missing
|
|
if category_input.startswith("Category:"):
|
|
return category_input.replace("_", " ")
|
|
|
|
# Assume it's just the category name
|
|
return f"Category:{category_input.replace('_', ' ')}"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ArticleWithoutImage:
|
|
"""Represents a Wikipedia article that needs an image."""
|
|
|
|
title: str
|
|
pageid: int
|
|
|
|
@property
|
|
def wikipedia_url(self) -> str:
|
|
"""URL to the Wikipedia article."""
|
|
return f"https://en.wikipedia.org/wiki/{self.title.replace(' ', '_')}"
|
|
|
|
@property
|
|
def search_url(self) -> str:
|
|
"""URL to search for this article in Flickr Mail."""
|
|
return f"/?enwp={quote(self.title)}"
|
|
|
|
|
|
# Common non-content images to ignore when checking if an article has images
|
|
NON_CONTENT_IMAGE_PATTERNS = [
|
|
"OOjs UI icon",
|
|
"Commons-logo",
|
|
"Symbol ",
|
|
"Edit-ltr",
|
|
"Ambox ",
|
|
"Question book",
|
|
"Wiki letter",
|
|
"Text document",
|
|
"Folder ",
|
|
"Crystal ",
|
|
"Nuvola ",
|
|
"Gnome-",
|
|
"Disambig ",
|
|
"DAB ",
|
|
]
|
|
|
|
|
|
def has_content_image(images: list[dict]) -> bool:
|
|
"""Check if an article has a content image (not just UI icons/logos)."""
|
|
for img in images:
|
|
title = img.get("title", "")
|
|
# Skip if it matches any non-content pattern
|
|
is_non_content = any(pattern in title for pattern in NON_CONTENT_IMAGE_PATTERNS)
|
|
if not is_non_content:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_articles_without_images(category: str) -> list[ArticleWithoutImage]:
|
|
"""Get articles in a category that don't have images.
|
|
|
|
Uses generator=categorymembers with prop=images to efficiently check
|
|
multiple articles in a single API request, following continuation until
|
|
all category members have been processed.
|
|
"""
|
|
params = {
|
|
"action": "query",
|
|
"generator": "categorymembers",
|
|
"gcmtitle": category,
|
|
"gcmtype": "page", # Only articles, not subcategories or files
|
|
"gcmnamespace": "0", # Main namespace only
|
|
"gcmlimit": "max",
|
|
"prop": "images",
|
|
"imlimit": "max", # Need enough to check all pages in batch
|
|
"format": "json",
|
|
}
|
|
|
|
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
|
|
articles_without_images: list[ArticleWithoutImage] = []
|
|
continue_token: str | None = None
|
|
|
|
while True:
|
|
request_params = params.copy()
|
|
if continue_token:
|
|
request_params["gcmcontinue"] = continue_token
|
|
|
|
try:
|
|
response = requests.get(
|
|
WIKIPEDIA_API, params=request_params, headers=headers, timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except (requests.RequestException, json.JSONDecodeError) as e:
|
|
print(f"Wikipedia API error: {e}")
|
|
break
|
|
|
|
pages = data.get("query", {}).get("pages", {})
|
|
for page in pages.values():
|
|
images = page.get("images", [])
|
|
|
|
# Skip if page has content images (not just UI icons)
|
|
if has_content_image(images):
|
|
continue
|
|
|
|
title = page.get("title", "")
|
|
pageid = page.get("pageid", 0)
|
|
|
|
if title and pageid:
|
|
articles_without_images.append(
|
|
ArticleWithoutImage(title=title, pageid=pageid)
|
|
)
|
|
|
|
continue_token = data.get("continue", {}).get("gcmcontinue")
|
|
if not continue_token:
|
|
break
|
|
|
|
# Sort by title for consistent display
|
|
articles_without_images.sort(key=lambda a: a.title)
|
|
return articles_without_images
|
|
|
|
|
|
def is_valid_flickr_image_url(url: str) -> bool:
|
|
"""Check if URL is a valid Flickr static image URL."""
|
|
valid_prefixes = (
|
|
"https://live.staticflickr.com/",
|
|
"https://farm", # farm1.staticflickr.com, farm2.staticflickr.com, etc.
|
|
"https://c1.staticflickr.com/",
|
|
"https://c2.staticflickr.com/",
|
|
)
|
|
if not url.startswith(valid_prefixes):
|
|
return False
|
|
# For farm URLs, verify the domain pattern
|
|
if url.startswith("https://farm"):
|
|
if ".staticflickr.com/" not in url:
|
|
return False
|
|
return True
|
|
|
|
|
|
def search_flickr(search_term: str, page: int = 1) -> SearchResult:
|
|
"""Search Flickr for photos matching the search term."""
|
|
encoded_term = quote(f'"{search_term}"')
|
|
url = f"https://flickr.com/search/?view_all=1&text={encoded_term}&page={page}"
|
|
|
|
response = requests.get(url, headers=BROWSER_HEADERS)
|
|
response.raise_for_status()
|
|
|
|
return parse_flickr_search_results(response.text, page)
|
|
|
|
|
|
def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult:
|
|
"""Parse Flickr search results HTML and extract photo data."""
|
|
empty_result = SearchResult(
|
|
photos=[], total_photos=0, current_page=page, total_pages=0
|
|
)
|
|
|
|
# Find the modelExport JSON embedded in the page
|
|
start = html.find("modelExport:")
|
|
if start == -1:
|
|
return empty_result
|
|
|
|
start += len("modelExport:")
|
|
while html[start].isspace():
|
|
start += 1
|
|
|
|
# Parse the JSON by counting braces
|
|
brace_count = 0
|
|
i = start
|
|
in_string = False
|
|
escape_next = False
|
|
|
|
while i < len(html):
|
|
char = html[i]
|
|
|
|
if escape_next:
|
|
escape_next = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == "\\" and in_string:
|
|
escape_next = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"' and not escape_next:
|
|
in_string = not in_string
|
|
elif not in_string:
|
|
if char == "{":
|
|
brace_count += 1
|
|
elif char == "}":
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
json_str = html[start : i + 1]
|
|
break
|
|
i += 1
|
|
else:
|
|
return empty_result
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except json.JSONDecodeError:
|
|
return empty_result
|
|
|
|
# Extract photos from the parsed data
|
|
photos: list[FlickrPhoto] = []
|
|
|
|
main = data.get("main", {})
|
|
photos_models = main.get("search-photos-lite-models", [])
|
|
|
|
if not photos_models:
|
|
return empty_result
|
|
|
|
model_data = photos_models[0].get("data", {})
|
|
photos_container = model_data.get("photos", {}).get("data", {})
|
|
photos_data = photos_container.get("_data", [])
|
|
total_photos = photos_container.get("totalItems", 0)
|
|
|
|
# Calculate total pages (Flickr caps at 4000 results)
|
|
total_pages = min(total_photos, 4000) // PHOTOS_PER_PAGE
|
|
if min(total_photos, 4000) % PHOTOS_PER_PAGE:
|
|
total_pages += 1
|
|
|
|
for photo_entry in photos_data:
|
|
# Skip None entries (placeholders from pagination)
|
|
if photo_entry is None:
|
|
continue
|
|
|
|
pd = photo_entry.get("data", {})
|
|
if not pd:
|
|
continue
|
|
|
|
sizes = pd.get("sizes", {}).get("data", {})
|
|
thumb_data = sizes.get("q", sizes.get("sq", {})).get("data", {})
|
|
medium_data = sizes.get("n", sizes.get("m", {})).get("data", {})
|
|
|
|
thumb_url = thumb_data.get("url", "")
|
|
medium_url = medium_data.get("url", "")
|
|
|
|
# Ensure URLs have protocol
|
|
if thumb_url.startswith("//"):
|
|
thumb_url = "https:" + thumb_url
|
|
if medium_url.startswith("//"):
|
|
medium_url = "https:" + medium_url
|
|
|
|
photos.append(
|
|
FlickrPhoto(
|
|
id=str(pd.get("id", "")),
|
|
title=pd.get("title", ""),
|
|
path_alias=pd.get("pathAlias", ""),
|
|
owner_nsid=pd.get("ownerNsid", ""),
|
|
username=pd.get("username", ""),
|
|
realname=pd.get("realname", ""),
|
|
license=pd.get("license", 0),
|
|
thumb_url=thumb_url,
|
|
medium_url=medium_url,
|
|
)
|
|
)
|
|
|
|
return SearchResult(
|
|
photos=photos,
|
|
total_photos=total_photos,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
)
|
|
|
|
|
|
def log_interaction(
|
|
interaction_type: str,
|
|
query: str | None = None,
|
|
flickr_url: str | None = None,
|
|
wikipedia_url: str | None = None,
|
|
) -> None:
|
|
"""Log a user interaction to the database."""
|
|
forwarded_for = flask.request.headers.get("X-Forwarded-For")
|
|
ip_address = forwarded_for.split(",")[0].strip() if forwarded_for else flask.request.remote_addr
|
|
user_agent = flask.request.headers.get("User-Agent")
|
|
session = get_session()
|
|
try:
|
|
entry = InteractionLog(
|
|
timestamp=int(time.time()),
|
|
interaction_type=interaction_type,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
query=query,
|
|
flickr_url=flickr_url,
|
|
wikipedia_url=wikipedia_url,
|
|
)
|
|
session.add(entry)
|
|
session.commit()
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
|
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
|
"""Handle exception."""
|
|
exec_type, exc_value, current_traceback = sys.exc_info()
|
|
assert exc_value
|
|
tb = DebugTraceback(exc_value)
|
|
|
|
summary = tb.render_traceback_html(include_title=False)
|
|
exc_lines = "".join(tb._te.format_exception_only())
|
|
|
|
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
|
last_frame_args = inspect.getargs(last_frame.f_code)
|
|
|
|
return (
|
|
flask.render_template(
|
|
"show_error.html",
|
|
plaintext=tb.render_traceback_text(),
|
|
exception=exc_lines,
|
|
exception_type=tb._te.exc_type.__name__,
|
|
summary=summary,
|
|
last_frame=last_frame,
|
|
last_frame_args=last_frame_args,
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
def start() -> str:
|
|
"""Start form."""
|
|
enwp = flask.request.args.get("enwp")
|
|
if not enwp:
|
|
recent_uploads, total_uploads = get_recent_commons_uploads()
|
|
return flask.render_template(
|
|
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
|
|
)
|
|
enwp = enwp.strip()
|
|
if not enwp:
|
|
recent_uploads, total_uploads = get_recent_commons_uploads()
|
|
return flask.render_template(
|
|
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
|
|
)
|
|
|
|
input_is = "url" if enwiki in enwp else "title"
|
|
|
|
wikipedia_url: str
|
|
wiki_part1: str
|
|
wiki_part2: str
|
|
if input_is == "url":
|
|
start = enwp.find(enwiki) + len(enwiki)
|
|
wiki_part2 = unquote(enwp[start:])
|
|
name = wiki_part2
|
|
wiki_part1 = enwp[:start]
|
|
wikipedia_url = enwp
|
|
else:
|
|
name = enwp
|
|
wiki_part1 = "https://" + enwiki
|
|
wiki_part2 = name.replace(" ", "_")
|
|
wikipedia_url = wiki_part1 + wiki_part2
|
|
|
|
# Remove disambiguation suffix like "(academic)" for Flickr search
|
|
name = name.replace("_", " ")
|
|
if " (" in name:
|
|
name = name[: name.find(" (")]
|
|
|
|
# Get category param if coming from category search
|
|
cat = flask.request.args.get("cat")
|
|
|
|
flickr_url = flask.request.args.get("flickr")
|
|
if not flickr_url:
|
|
# Search Flickr for photos
|
|
page = flask.request.args.get("page", 1, type=int)
|
|
page = max(1, page) # Ensure page is at least 1
|
|
if page == 1:
|
|
log_interaction("search_article", query=name, wikipedia_url=wikipedia_url)
|
|
search_result = search_flickr(name, page)
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
search_result=search_result,
|
|
cat=cat,
|
|
)
|
|
|
|
if "/in/" in flickr_url:
|
|
flickr_url = flickr_url[: flickr_url.find("/in/")]
|
|
|
|
flickr_start = "https://flickr.com/photos/"
|
|
|
|
assert flickr_url.startswith(flickr_start)
|
|
flickr_username = flickr_url[
|
|
len(flickr_start) : flickr_url.find("/", len(flickr_start))
|
|
]
|
|
|
|
nsid = flickr_usrename_to_nsid(flickr_username)
|
|
assert nsid
|
|
print(nsid)
|
|
|
|
# Get optional image URL for display, validate it's from Flickr
|
|
img_url = flask.request.args.get("img")
|
|
if img_url and not is_valid_flickr_image_url(img_url):
|
|
img_url = None
|
|
|
|
# Get flickr_user name and build profile URL
|
|
flickr_user = flask.request.args.get("flickr_user", "")
|
|
flickr_user_url = f"https://www.flickr.com/photos/{flickr_username}/"
|
|
|
|
# Check for previous messages to this user
|
|
previous_messages = get_previous_messages(flickr_user, flickr_username)
|
|
|
|
# Get license code if provided
|
|
license_code = flask.request.args.get("license", type=int)
|
|
license_name = (
|
|
FLICKR_LICENSES.get(license_code, "") if license_code is not None else ""
|
|
)
|
|
|
|
is_free_license = license_code in FREE_LICENSES
|
|
is_nonfree_cc = license_code in NONFREE_CC_LICENSES
|
|
|
|
# For free licenses, show upload options instead of message
|
|
if is_free_license:
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
flickr_url=flickr_url,
|
|
img_url=img_url,
|
|
license_code=license_code,
|
|
license_name=license_name,
|
|
is_free_license=True,
|
|
wikipedia_url=wikipedia_url,
|
|
flickr_user=flickr_user,
|
|
flickr_user_url=flickr_user_url,
|
|
cat=cat,
|
|
previous_messages=previous_messages,
|
|
)
|
|
|
|
log_interaction(
|
|
"generate_message",
|
|
query=name,
|
|
flickr_url=flickr_url,
|
|
wikipedia_url=wikipedia_url,
|
|
)
|
|
|
|
msg = flask.render_template(
|
|
"message.jinja",
|
|
flickr_url=flickr_url,
|
|
enwp=enwp,
|
|
wikipedia_url=wikipedia_url,
|
|
name=name,
|
|
wiki_part1=wiki_part1,
|
|
wiki_part2=wiki_part2,
|
|
is_nonfree_cc=is_nonfree_cc,
|
|
license_name=license_name,
|
|
)
|
|
|
|
subject = f"Request to use your photo of {name} on Wikipedia"
|
|
|
|
lines = msg.split("\n\n")
|
|
|
|
return flask.render_template(
|
|
"combined.html",
|
|
name=name,
|
|
enwp=enwp,
|
|
flickr_url=flickr_url,
|
|
subject=subject,
|
|
lines=lines,
|
|
nsid=nsid,
|
|
img_url=img_url,
|
|
license_code=license_code,
|
|
license_name=license_name,
|
|
flickr_user=flickr_user,
|
|
flickr_user_url=flickr_user_url,
|
|
cat=cat,
|
|
previous_messages=previous_messages,
|
|
)
|
|
|
|
|
|
@app.route("/category")
|
|
def category_search() -> str:
|
|
"""Find articles in a Wikipedia category that need images."""
|
|
cat = flask.request.args.get("cat", "").strip()
|
|
|
|
if not cat:
|
|
return flask.render_template("category.html")
|
|
|
|
category = parse_category_input(cat)
|
|
if not category:
|
|
return flask.render_template(
|
|
"category.html",
|
|
error="Invalid category format. Please enter a category name or URL.",
|
|
cat=cat,
|
|
)
|
|
|
|
log_interaction("search_category", query=category)
|
|
articles = get_articles_without_images(category)
|
|
|
|
# Get the display name (without Category: prefix)
|
|
category_name = category.replace("Category:", "")
|
|
|
|
return flask.render_template(
|
|
"category.html",
|
|
cat=cat,
|
|
category=category,
|
|
category_name=category_name,
|
|
articles=articles,
|
|
)
|
|
|
|
|
|
def get_params(line_iter: collections.abc.Iterable[str]) -> str:
|
|
"""Find and return params from flickr profile page."""
|
|
look_for = 'params: {"isEditingTestimonial":false,'
|
|
return next(line[line.find("{") :] for line in line_iter if look_for in line)
|
|
|
|
|
|
def flickr_usrename_to_nsid(username: str) -> str:
|
|
"""Get NSID from flickr username."""
|
|
url = f"https://www.flickr.com/people/{username}/"
|
|
r = requests.get(url, headers=BROWSER_HEADERS)
|
|
params_str = get_params(r.text.splitlines())
|
|
params, _ = json.JSONDecoder().raw_decode(params_str)
|
|
return typing.cast(str, params["nsid"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0")
|