flickr-mail/main.py
Edward Betts 32e81c2d93 Strip extra blank lines from rendered Flickr mail message
The Jinja control flow tags in the message template produced extra
blank lines at the top, bottom, and middle of the rendered message.
Strip the output and filter empty paragraphs so the copied message
pastes cleanly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 14:13:59 +00:00

887 lines
27 KiB
Python
Executable file

#!/usr/bin/python3
"""Find photos on flickr for Wikipedia articles and contact the photographer."""
import collections
import dataclasses
import inspect
import json
import sys
import time
import traceback
import typing
from urllib.parse import quote, unquote
import flask
import requests
import werkzeug
from sqlalchemy import func
from werkzeug.debug.tbtools import DebugTraceback
from flickr_mail.database import get_session
from flickr_mail.models import FlickrUpload, InteractionLog, SentMessage, ThumbnailCache
from flickr_mail.url_utils import extract_urls_from_message, normalize_flickr_url
import re
app = flask.Flask(__name__)
app.debug = False
enwiki = "en.wikipedia.org/wiki/"
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
RECENT_UPLOADS_COUNT = 24
# User agent for Wikimedia API requests
WIKIMEDIA_USER_AGENT = (
"FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
)
WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php"
# Browser-like headers for Flickr requests
BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
# Flickr license codes to human-readable names
FLICKR_LICENSES = {
0: "All Rights Reserved",
1: "CC BY-NC-SA 2.0",
2: "CC BY-NC 2.0",
3: "CC BY-NC-ND 2.0",
4: "CC BY 2.0",
5: "CC BY-SA 2.0",
6: "CC BY-ND 2.0",
7: "No known copyright",
8: "US Government",
9: "CC0",
10: "Public Domain",
# CC 4.0 licenses (codes confirmed via Flickr)
16: "CC BY-NC-ND 4.0",
}
# Non-free CC licenses (NC or ND restrictions)
NONFREE_CC_LICENSES = {1, 2, 3, 6, 11, 12, 13, 16}
# Wikipedia-compatible free licenses
FREE_LICENSES = {4, 5, 7, 8, 9, 10, 14, 15}
PHOTOS_PER_PAGE = 25
@dataclasses.dataclass
class FlickrPhoto:
"""Represents a Flickr photo from search results."""
id: str
title: str
path_alias: str
owner_nsid: str
username: str
realname: str
license: int
thumb_url: str
medium_url: str
@property
def flickr_url(self) -> str:
"""URL to the photo page on Flickr."""
return f"https://flickr.com/photos/{self.path_alias}/{self.id}"
@property
def license_name(self) -> str:
"""Human-readable license name."""
return FLICKR_LICENSES.get(self.license, f"License {self.license}")
@dataclasses.dataclass
class SearchResult:
"""Flickr search results with pagination metadata."""
photos: list[FlickrPhoto]
total_photos: int
current_page: int
total_pages: int
@dataclasses.dataclass
class CommonsUpload:
"""Represents a recent upload to Wikimedia Commons."""
title: str
thumb_url: str
commons_url: str
flickr_url: str
creator: str
timestamp: str
wikipedia_url: str = ""
creator_profile_url: str = ""
@property
def is_wikidata_item(self) -> bool:
"""Check if the wikipedia_url is actually a Wikidata item."""
if not self.wikipedia_url:
return False
# Match Q followed by digits at the end of the URL
return bool(re.search(r"/Q\d+$", self.wikipedia_url))
@property
def wiki_link_url(self) -> str:
"""Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia)."""
if not self.wikipedia_url:
return ""
if self.is_wikidata_item:
# Extract Q-id and build Wikidata URL
match = re.search(r"(Q\d+)$", self.wikipedia_url)
if match:
return f"https://www.wikidata.org/wiki/{match.group(1)}"
return self.wikipedia_url
@property
def wiki_link_label(self) -> str:
"""Get the label for the wiki link."""
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
"""Fetch thumbnail URLs from Commons API for the given file titles."""
if not titles:
return {}
# Commons API allows up to 50 titles per request
params = {
"action": "query",
"titles": "|".join(titles),
"prop": "imageinfo",
"iiprop": "url",
"iiurlwidth": 150,
"format": "json",
}
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
try:
response = requests.get(
"https://commons.wikimedia.org/w/api.php",
params=params,
headers=headers,
timeout=10,
)
response.raise_for_status()
data = response.json()
except (requests.RequestException, json.JSONDecodeError):
return {}
thumbnails: dict[str, str] = {}
pages = data.get("query", {}).get("pages", {})
for page in pages.values():
title = page.get("title", "")
imageinfo = page.get("imageinfo", [])
if imageinfo:
thumb_url = imageinfo[0].get("thumburl", "")
if thumb_url:
thumbnails[title] = thumb_url
return thumbnails
def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]:
"""Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail.
Returns a tuple of (uploads_list, total_count) where total_count is the total number
of uploads obtained via Flickr mail (not just the ones returned).
"""
session = get_session()
try:
query = (
session.query(FlickrUpload, SentMessage)
.join(SentMessage)
.order_by(FlickrUpload.timestamp.desc())
)
total_matched = query.count()
if total_matched == 0:
return [], 0
recent = query.limit(RECENT_UPLOADS_COUNT).all()
# Get thumbnails from cache
titles = [upload.title for upload, msg in recent]
now = int(time.time())
cached = {
tc.title: tc
for tc in session.query(ThumbnailCache)
.filter(ThumbnailCache.title.in_(titles))
.all()
}
# Find titles needing fetch (missing or expired)
titles_to_fetch = [
t for t in titles
if t not in cached or (now - (cached[t].fetched_at or 0)) > COMMONS_CACHE_MAX_AGE
]
if titles_to_fetch:
new_thumbs = fetch_commons_thumbnails(titles_to_fetch)
for title, thumb_url in new_thumbs.items():
existing = cached.get(title)
if existing:
existing.thumb_url = thumb_url
existing.fetched_at = now
else:
tc = ThumbnailCache(title=title, thumb_url=thumb_url, fetched_at=now)
session.add(tc)
cached[title] = tc
session.commit()
result: list[CommonsUpload] = []
for upload, msg in recent:
thumb_url = cached[upload.title].thumb_url if upload.title in cached else ""
if not thumb_url:
continue
commons_url = f"https://commons.wikimedia.org/wiki/{upload.title.replace(' ', '_')}"
result.append(
CommonsUpload(
title=upload.title.replace("File:", "").rsplit(".", 1)[0],
thumb_url=thumb_url,
commons_url=commons_url,
flickr_url=upload.flickr_url or "",
creator=upload.creator or "Unknown",
timestamp=(upload.timestamp or "")[:10],
wikipedia_url=upload.wikipedia_url or "",
creator_profile_url=upload.creator_profile_url or "",
)
)
return result, total_matched
finally:
session.close()
def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]:
"""Get previous messages sent to a Flickr user.
Checks both the display name (flickr_user) and username (flickr_username)
against the recipient field in the messages index.
"""
names = set()
if flickr_user:
names.add(flickr_user.lower())
if flickr_username:
names.add(flickr_username.lower())
if not names:
return []
session = get_session()
try:
messages = (
session.query(SentMessage)
.filter(func.lower(SentMessage.recipient).in_(names))
.all()
)
return [
{
"message_id": m.message_id,
"subject": m.subject,
"url": m.url,
"recipient": m.recipient,
"date": m.date,
}
for m in messages
]
finally:
session.close()
def parse_category_input(category_input: str) -> str | None:
"""Parse category title from URL or direct input.
Returns the category title with 'Category:' prefix, or None if invalid.
"""
category_input = category_input.strip()
# Handle URL format: https://en.wikipedia.org/wiki/Category:Example
if "wikipedia.org" in category_input:
match = re.search(r"/wiki/(Category:[^#?]+)", category_input)
if match:
return unquote(match.group(1)).replace("_", " ")
return None
# Handle direct input - add Category: prefix if missing
if category_input.startswith("Category:"):
return category_input.replace("_", " ")
# Assume it's just the category name
return f"Category:{category_input.replace('_', ' ')}"
@dataclasses.dataclass
class ArticleWithoutImage:
"""Represents a Wikipedia article that needs an image."""
title: str
pageid: int
@property
def wikipedia_url(self) -> str:
"""URL to the Wikipedia article."""
return f"https://en.wikipedia.org/wiki/{self.title.replace(' ', '_')}"
@property
def search_url(self) -> str:
"""URL to search for this article in Flickr Mail."""
return f"/?enwp={quote(self.title)}"
@dataclasses.dataclass
class CategoryResult:
"""Result of a paginated category search."""
articles: list[ArticleWithoutImage]
gcmcontinue: str | None
# Common non-content images to ignore when checking if an article has images
NON_CONTENT_IMAGE_PATTERNS = [
"OOjs UI icon",
"Commons-logo",
"Symbol ",
"Edit-ltr",
"Ambox ",
"Question book",
"Wiki letter",
"Text document",
"Folder ",
"Crystal ",
"Nuvola ",
"Gnome-",
"Disambig ",
"DAB ",
]
def has_content_image(images: list[dict]) -> bool:
"""Check if an article has a content image (not just UI icons/logos)."""
for img in images:
title = img.get("title", "")
# Skip if it matches any non-content pattern
is_non_content = any(pattern in title for pattern in NON_CONTENT_IMAGE_PATTERNS)
if not is_non_content:
return True
return False
def get_articles_without_images(
category: str,
limit: int = 200,
gcmcontinue: str | None = None,
) -> CategoryResult:
"""Get articles in a category that don't have images.
Uses generator=categorymembers with prop=images to efficiently check
multiple articles in a single API request, following continuation until
the limit is reached or all category members have been processed.
"""
params = {
"action": "query",
"generator": "categorymembers",
"gcmtitle": category,
"gcmtype": "page", # Only articles, not subcategories or files
"gcmnamespace": "0", # Main namespace only
"gcmlimit": "50", # Small batches so images fit in one response
"prop": "images",
"imlimit": "max",
"format": "json",
}
headers = {"User-Agent": WIKIMEDIA_USER_AGENT}
articles_without_images: list[ArticleWithoutImage] = []
seen_pageids: set[int] = set()
next_gcmcontinue: str | None = None
# Build initial continue params from the external pagination token
continue_params: dict[str, str] = {}
if gcmcontinue:
continue_params = {"gcmcontinue": gcmcontinue, "continue": "gcmcontinue||"}
while True:
request_params = params.copy()
request_params.update(continue_params)
try:
response = requests.get(
WIKIPEDIA_API, params=request_params, headers=headers, timeout=30
)
response.raise_for_status()
data = response.json()
except (requests.RequestException, json.JSONDecodeError) as e:
print(f"Wikipedia API error: {e}")
break
pages = data.get("query", {}).get("pages", {})
for page in pages.values():
pageid = page.get("pageid", 0)
if not pageid or pageid in seen_pageids:
continue
seen_pageids.add(pageid)
images = page.get("images", [])
# Skip if page has content images (not just UI icons)
if has_content_image(images):
continue
title = page.get("title", "")
if title:
articles_without_images.append(
ArticleWithoutImage(title=title, pageid=pageid)
)
api_continue = data.get("continue")
if not api_continue:
break
# Only stop at generator boundaries where we have a resumable token
gcmc = api_continue.get("gcmcontinue")
if gcmc and len(articles_without_images) >= limit:
next_gcmcontinue = gcmc
break
continue_params = api_continue
# Sort by title for consistent display
articles_without_images.sort(key=lambda a: a.title)
return CategoryResult(
articles=articles_without_images,
gcmcontinue=next_gcmcontinue,
)
def is_valid_flickr_image_url(url: str) -> bool:
"""Check if URL is a valid Flickr static image URL."""
valid_prefixes = (
"https://live.staticflickr.com/",
"https://farm", # farm1.staticflickr.com, farm2.staticflickr.com, etc.
"https://c1.staticflickr.com/",
"https://c2.staticflickr.com/",
)
if not url.startswith(valid_prefixes):
return False
# For farm URLs, verify the domain pattern
if url.startswith("https://farm"):
if ".staticflickr.com/" not in url:
return False
return True
def search_flickr(search_term: str, page: int = 1) -> SearchResult:
"""Search Flickr for photos matching the search term."""
encoded_term = quote(search_term)
url = f"https://flickr.com/search/?view_all=1&text={encoded_term}&page={page}"
response = requests.get(url, headers=BROWSER_HEADERS)
response.raise_for_status()
return parse_flickr_search_results(response.text, page)
def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult:
"""Parse Flickr search results HTML and extract photo data."""
empty_result = SearchResult(
photos=[], total_photos=0, current_page=page, total_pages=0
)
# Find the modelExport JSON embedded in the page
start = html.find("modelExport:")
if start == -1:
return empty_result
start += len("modelExport:")
while html[start].isspace():
start += 1
# Parse the JSON by counting braces
brace_count = 0
i = start
in_string = False
escape_next = False
while i < len(html):
char = html[i]
if escape_next:
escape_next = False
i += 1
continue
if char == "\\" and in_string:
escape_next = True
i += 1
continue
if char == '"' and not escape_next:
in_string = not in_string
elif not in_string:
if char == "{":
brace_count += 1
elif char == "}":
brace_count -= 1
if brace_count == 0:
json_str = html[start : i + 1]
break
i += 1
else:
return empty_result
try:
data = json.loads(json_str)
except json.JSONDecodeError:
return empty_result
# Extract photos from the parsed data
photos: list[FlickrPhoto] = []
main = data.get("main", {})
photos_models = main.get("search-photos-lite-models", [])
if not photos_models:
return empty_result
model_data = photos_models[0].get("data", {})
photos_container = model_data.get("photos", {}).get("data", {})
photos_data = photos_container.get("_data", [])
total_photos = photos_container.get("totalItems", 0)
# Calculate total pages (Flickr caps at 4000 results)
total_pages = min(total_photos, 4000) // PHOTOS_PER_PAGE
if min(total_photos, 4000) % PHOTOS_PER_PAGE:
total_pages += 1
for photo_entry in photos_data:
# Skip None entries (placeholders from pagination)
if photo_entry is None:
continue
pd = photo_entry.get("data", {})
if not pd:
continue
sizes = pd.get("sizes", {}).get("data", {})
thumb_data = sizes.get("q", sizes.get("sq", {})).get("data", {})
medium_data = sizes.get("n", sizes.get("m", {})).get("data", {})
thumb_url = thumb_data.get("url", "")
medium_url = medium_data.get("url", "")
# Ensure URLs have protocol
if thumb_url.startswith("//"):
thumb_url = "https:" + thumb_url
if medium_url.startswith("//"):
medium_url = "https:" + medium_url
photos.append(
FlickrPhoto(
id=str(pd.get("id", "")),
title=pd.get("title", ""),
path_alias=pd.get("pathAlias", ""),
owner_nsid=pd.get("ownerNsid", ""),
username=pd.get("username", ""),
realname=pd.get("realname", ""),
license=pd.get("license", 0),
thumb_url=thumb_url,
medium_url=medium_url,
)
)
return SearchResult(
photos=photos,
total_photos=total_photos,
current_page=page,
total_pages=total_pages,
)
def log_interaction(
interaction_type: str,
query: str | None = None,
flickr_url: str | None = None,
wikipedia_url: str | None = None,
) -> None:
"""Log a user interaction to the database."""
forwarded_for = flask.request.headers.get("X-Forwarded-For")
ip_address = forwarded_for.split(",")[0].strip() if forwarded_for else flask.request.remote_addr
user_agent = flask.request.headers.get("User-Agent")
session = get_session()
try:
entry = InteractionLog(
timestamp=int(time.time()),
interaction_type=interaction_type,
ip_address=ip_address,
user_agent=user_agent,
query=query,
flickr_url=flickr_url,
wikipedia_url=wikipedia_url,
)
session.add(entry)
session.commit()
finally:
session.close()
@app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
"""Handle exception."""
exec_type, exc_value, current_traceback = sys.exc_info()
assert exc_value
tb = DebugTraceback(exc_value)
summary = tb.render_traceback_html(include_title=False)
exc_lines = "".join(tb._te.format_exception_only())
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
last_frame_args = inspect.getargs(last_frame.f_code)
return (
flask.render_template(
"show_error.html",
plaintext=tb.render_traceback_text(),
exception=exc_lines,
exception_type=tb._te.exc_type.__name__,
summary=summary,
last_frame=last_frame,
last_frame_args=last_frame_args,
),
500,
)
@app.route("/")
def start() -> str:
"""Start form."""
enwp = flask.request.args.get("enwp")
if not enwp:
recent_uploads, total_uploads = get_recent_commons_uploads()
return flask.render_template(
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
)
enwp = enwp.strip()
if not enwp:
recent_uploads, total_uploads = get_recent_commons_uploads()
return flask.render_template(
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
)
input_is = "url" if enwiki in enwp else "title"
wikipedia_url: str
wiki_part1: str
wiki_part2: str
if input_is == "url":
start = enwp.find(enwiki) + len(enwiki)
wiki_part2 = unquote(enwp[start:])
name = wiki_part2
wiki_part1 = enwp[:start]
wikipedia_url = enwp
else:
name = enwp
wiki_part1 = "https://" + enwiki
wiki_part2 = name.replace(" ", "_")
wikipedia_url = wiki_part1 + wiki_part2
# Remove disambiguation suffix like "(academic)" for Flickr search
name = name.replace("_", " ")
if " (" in name:
name = name[: name.find(" (")]
# Get category param if coming from category search
cat = flask.request.args.get("cat")
# Allow overriding the Flickr search term (default includes quotes for phrase search)
flickr_search = flask.request.args.get("flickr_search") or f'"{name}"'
flickr_url = flask.request.args.get("flickr")
if not flickr_url:
# Search Flickr for photos
page = flask.request.args.get("page", 1, type=int)
page = max(1, page) # Ensure page is at least 1
if page == 1:
log_interaction("search_article", query=flickr_search, wikipedia_url=wikipedia_url)
search_result = search_flickr(flickr_search, page)
# If no results and using default search, retry without middle initial
if not search_result.photos and not flask.request.args.get("flickr_search"):
without_middle = re.sub(r" [A-Z]\. ", " ", name)
if without_middle != name:
flickr_search = f'"{without_middle}"'
search_result = search_flickr(flickr_search, page)
return flask.render_template(
"combined.html",
name=name,
enwp=enwp,
search_result=search_result,
cat=cat,
flickr_search=flickr_search,
)
if "/in/" in flickr_url:
flickr_url = flickr_url[: flickr_url.find("/in/")]
flickr_start = "https://flickr.com/photos/"
assert flickr_url.startswith(flickr_start)
flickr_username = flickr_url[
len(flickr_start) : flickr_url.find("/", len(flickr_start))
]
nsid = flickr_usrename_to_nsid(flickr_username)
assert nsid
print(nsid)
# Get optional image URL for display, validate it's from Flickr
img_url = flask.request.args.get("img")
if img_url and not is_valid_flickr_image_url(img_url):
img_url = None
# Get flickr_user name and build profile URL
flickr_user = flask.request.args.get("flickr_user", "")
flickr_user_url = f"https://www.flickr.com/photos/{flickr_username}/"
# Check for previous messages to this user
previous_messages = get_previous_messages(flickr_user, flickr_username)
# Get license code if provided
license_code = flask.request.args.get("license", type=int)
license_name = (
FLICKR_LICENSES.get(license_code, "") if license_code is not None else ""
)
is_free_license = license_code in FREE_LICENSES
is_nonfree_cc = license_code in NONFREE_CC_LICENSES
# For free licenses, show upload options instead of message
if is_free_license:
return flask.render_template(
"combined.html",
name=name,
enwp=enwp,
flickr_url=flickr_url,
img_url=img_url,
license_code=license_code,
license_name=license_name,
is_free_license=True,
wikipedia_url=wikipedia_url,
flickr_user=flickr_user,
flickr_user_url=flickr_user_url,
cat=cat,
previous_messages=previous_messages,
flickr_search=flickr_search,
)
log_interaction(
"generate_message",
query=name,
flickr_url=flickr_url,
wikipedia_url=wikipedia_url,
)
msg = flask.render_template(
"message.jinja",
flickr_url=flickr_url,
enwp=enwp,
wikipedia_url=wikipedia_url,
name=name,
wiki_part1=wiki_part1,
wiki_part2=wiki_part2,
is_nonfree_cc=is_nonfree_cc,
license_name=license_name,
)
subject = f"Request to use your photo of {name} on Wikipedia"
lines = [line for line in msg.strip().split("\n\n") if line.strip()]
return flask.render_template(
"combined.html",
name=name,
enwp=enwp,
flickr_url=flickr_url,
subject=subject,
lines=lines,
nsid=nsid,
img_url=img_url,
license_code=license_code,
license_name=license_name,
flickr_user=flickr_user,
flickr_user_url=flickr_user_url,
cat=cat,
previous_messages=previous_messages,
flickr_search=flickr_search,
)
@app.route("/category")
def category_search() -> str:
"""Find articles in a Wikipedia category that need images."""
cat = flask.request.args.get("cat", "").strip()
if not cat:
return flask.render_template("category.html")
category = parse_category_input(cat)
if not category:
return flask.render_template(
"category.html",
error="Invalid category format. Please enter a category name or URL.",
cat=cat,
)
log_interaction("search_category", query=category)
gcmcontinue = flask.request.args.get("gcmcontinue") or None
result = get_articles_without_images(category, gcmcontinue=gcmcontinue)
# Get the display name (without Category: prefix)
category_name = category.replace("Category:", "")
return flask.render_template(
"category.html",
cat=cat,
category=category,
category_name=category_name,
articles=result.articles,
gcmcontinue=result.gcmcontinue,
)
def get_params(line_iter: collections.abc.Iterable[str]) -> str:
"""Find and return params from flickr profile page."""
look_for = 'params: {"isEditingTestimonial":false,'
return next(line[line.find("{") :] for line in line_iter if look_for in line)
def flickr_usrename_to_nsid(username: str) -> str:
"""Get NSID from flickr username."""
url = f"https://www.flickr.com/people/{username}/"
r = requests.get(url, headers=BROWSER_HEADERS)
params_str = get_params(r.text.splitlines())
params, _ = json.JSONDecoder().raw_decode(params_str)
return typing.cast(str, params["nsid"])
if __name__ == "__main__":
app.run(host="0.0.0.0")