Show recent Commons uploads obtained via Flickr mail

Display recent Wikimedia Commons uploads on the home page, filtered to
only show images that were obtained by contacting creators via Flickr
mail. Each upload shows:
- Thumbnail linking to Commons
- Creator name linking to their Flickr profile
- Link to the illustrated Wikipedia article (or Wikidata item)

Features:
- Parse sent mail messages to extract Flickr and Wikipedia URLs
- Match Commons uploads with sent mail by normalized Flickr URL
- Cache Commons API thumbnail responses and sent mail index
- Handle Wikidata item URLs (Q-numbers) with correct links
- Add update_flickr_uploads.py script to find uploads from UploadWizard
  contributions by checking Commons API metadata

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Edward Betts 2026-02-06 10:42:34 +00:00
parent 0062de8ede
commit a2d29d7937
6 changed files with 562 additions and 3 deletions

317
main.py
View file

@ -6,8 +6,10 @@ import dataclasses
import inspect
import json
import sys
import time
import traceback
import typing
from pathlib import Path
from urllib.parse import quote, unquote
import flask
@ -16,11 +18,33 @@ import werkzeug
from werkzeug.debug.tbtools import DebugTraceback
import re
app = flask.Flask(__name__)
app.debug = False
enwiki = "en.wikipedia.org/wiki/"
# Path to Commons contributions data and sent mail
COMMONS_UPLOADS_FILE = (
Path(__file__).parent / "commons_contributions" / "flickr_uploads.json"
)
COMMONS_CACHE_FILE = (
Path(__file__).parent / "commons_contributions" / "thumbnail_cache.json"
)
SENT_MAIL_DIR = Path(__file__).parent / "sent_mail" / "messages"
SENT_MAIL_INDEX_CACHE = (
Path(__file__).parent / "commons_contributions" / "sent_mail_index.json"
)
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
RECENT_UPLOADS_COUNT = 24
# User agent for Commons API requests
COMMONS_USER_AGENT = (
"FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
)
# Browser-like headers for Flickr requests
BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
@ -91,6 +115,289 @@ class SearchResult:
total_pages: int
@dataclasses.dataclass
class CommonsUpload:
"""Represents a recent upload to Wikimedia Commons."""
title: str
thumb_url: str
commons_url: str
flickr_url: str
creator: str
timestamp: str
wikipedia_url: str = ""
creator_profile_url: str = ""
@property
def is_wikidata_item(self) -> bool:
"""Check if the wikipedia_url is actually a Wikidata item."""
if not self.wikipedia_url:
return False
# Match Q followed by digits at the end of the URL
return bool(re.search(r"/Q\d+$", self.wikipedia_url))
@property
def wiki_link_url(self) -> str:
"""Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia)."""
if not self.wikipedia_url:
return ""
if self.is_wikidata_item:
# Extract Q-id and build Wikidata URL
match = re.search(r"(Q\d+)$", self.wikipedia_url)
if match:
return f"https://www.wikidata.org/wiki/{match.group(1)}"
return self.wikipedia_url
@property
def wiki_link_label(self) -> str:
"""Get the label for the wiki link."""
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
def normalize_flickr_url(url: str) -> str:
"""Normalize a Flickr photo URL for comparison."""
# Remove protocol
url = url.replace("https://", "").replace("http://", "")
# Remove www.
url = url.replace("www.", "")
# Remove trailing slash
url = url.rstrip("/")
# Ensure it starts with flickr.com
if not url.startswith("flickr.com"):
return ""
return url
def extract_urls_from_message(body: str) -> tuple[str, str]:
"""Extract flickr URL and Wikipedia URL from message body."""
flickr_url = ""
wikipedia_url = ""
# Find flickr photo URLs
flickr_pattern = r"(?:https?://)?(?:www\.)?flickr\.com/photos/[^/\s]+/\d+"
flickr_matches = re.findall(flickr_pattern, body)
if flickr_matches:
flickr_url = flickr_matches[0]
if not flickr_url.startswith("http"):
flickr_url = "https://" + flickr_url
# Find Wikipedia URLs
wiki_pattern = r"(?:https?://)?(?:www\.)?en\.wikipedia\.org/wiki/[^\s<\])]+"
wiki_matches = re.findall(wiki_pattern, body)
if wiki_matches:
wikipedia_url = wiki_matches[0]
if not wikipedia_url.startswith("http"):
wikipedia_url = "https://" + wikipedia_url
return flickr_url, wikipedia_url
def build_sent_mail_index() -> dict[str, dict[str, str]]:
"""Build an index of sent mail: normalized_flickr_url -> {wikipedia_url, recipient}."""
if not SENT_MAIL_DIR.exists():
return {}
# Check if we have a cached index
if SENT_MAIL_INDEX_CACHE.exists():
try:
with open(SENT_MAIL_INDEX_CACHE) as f:
cache = json.load(f)
# Check if cache is still valid (compare file count)
json_files = list(SENT_MAIL_DIR.glob("*.json"))
if cache.get("file_count") == len(json_files):
return cache.get("index", {})
except (json.JSONDecodeError, OSError):
pass
index: dict[str, dict[str, str]] = {}
json_files = list(SENT_MAIL_DIR.glob("*.json"))
for json_file in json_files:
try:
with open(json_file) as f:
message = json.load(f)
except (json.JSONDecodeError, OSError):
continue
# Skip replies - we want original requests
subject = message.get("subject", "")
if subject.startswith("Re:"):
continue
body = message.get("body", "")
flickr_url, wikipedia_url = extract_urls_from_message(body)
if not flickr_url:
continue
normalized = normalize_flickr_url(flickr_url)
if not normalized:
continue
# Extract creator profile URL from flickr URL
# flickr.com/photos/username/12345 -> flickr.com/photos/username
parts = flickr_url.split("/")
creator_profile = ""
for i, part in enumerate(parts):
if part == "photos" and i + 1 < len(parts):
username = parts[i + 1]
creator_profile = f"https://www.flickr.com/photos/{username}"
break
index[normalized] = {
"wikipedia_url": wikipedia_url,
"creator_profile_url": creator_profile,
"recipient": message.get("recipient", ""),
}
# Cache the index
try:
with open(SENT_MAIL_INDEX_CACHE, "w") as f:
json.dump({"file_count": len(json_files), "index": index}, f)
except OSError:
pass
return index
def load_commons_thumbnail_cache() -> dict[str, typing.Any]:
"""Load the thumbnail cache from disk."""
if not COMMONS_CACHE_FILE.exists():
return {"timestamp": 0, "thumbnails": {}}
try:
with open(COMMONS_CACHE_FILE) as f:
return typing.cast(dict[str, typing.Any], json.load(f))
except (json.JSONDecodeError, OSError):
return {"timestamp": 0, "thumbnails": {}}
def save_commons_thumbnail_cache(cache: dict[str, typing.Any]) -> None:
"""Save the thumbnail cache to disk."""
try:
with open(COMMONS_CACHE_FILE, "w") as f:
json.dump(cache, f)
except OSError:
pass # Ignore cache write errors
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
"""Fetch thumbnail URLs from Commons API for the given file titles."""
if not titles:
return {}
# Commons API allows up to 50 titles per request
params = {
"action": "query",
"titles": "|".join(titles),
"prop": "imageinfo",
"iiprop": "url",
"iiurlwidth": 150,
"format": "json",
}
headers = {"User-Agent": COMMONS_USER_AGENT}
try:
response = requests.get(
"https://commons.wikimedia.org/w/api.php",
params=params,
headers=headers,
timeout=10,
)
response.raise_for_status()
data = response.json()
except (requests.RequestException, json.JSONDecodeError):
return {}
thumbnails: dict[str, str] = {}
pages = data.get("query", {}).get("pages", {})
for page in pages.values():
title = page.get("title", "")
imageinfo = page.get("imageinfo", [])
if imageinfo:
thumb_url = imageinfo[0].get("thumburl", "")
if thumb_url:
thumbnails[title] = thumb_url
return thumbnails
def get_recent_commons_uploads() -> list[CommonsUpload]:
"""Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail."""
if not COMMONS_UPLOADS_FILE.exists():
return []
try:
with open(COMMONS_UPLOADS_FILE) as f:
all_uploads = json.load(f)
except (json.JSONDecodeError, OSError):
return []
# Build sent mail index
sent_mail_index = build_sent_mail_index()
# Filter uploads to only those with matching sent mail
uploads_with_mail: list[dict[str, typing.Any]] = []
for upload in all_uploads:
flickr_url = upload.get("flickr_url", "")
normalized = normalize_flickr_url(flickr_url)
if normalized and normalized in sent_mail_index:
upload["_mail_info"] = sent_mail_index[normalized]
uploads_with_mail.append(upload)
if len(uploads_with_mail) >= RECENT_UPLOADS_COUNT:
break
if not uploads_with_mail:
return []
# Load cache and check if it's still valid
cache = load_commons_thumbnail_cache()
cache_age = time.time() - cache.get("timestamp", 0)
cached_thumbs = cache.get("thumbnails", {})
# Find which titles need fetching
titles = [u["title"] for u in uploads_with_mail]
titles_to_fetch = [t for t in titles if t not in cached_thumbs]
# Fetch missing thumbnails or refresh if cache is old
if titles_to_fetch or cache_age > COMMONS_CACHE_MAX_AGE:
new_thumbs = fetch_commons_thumbnails(
titles if cache_age > COMMONS_CACHE_MAX_AGE else titles_to_fetch
)
cached_thumbs.update(new_thumbs)
cache = {"timestamp": time.time(), "thumbnails": cached_thumbs}
save_commons_thumbnail_cache(cache)
# Build the result list
result: list[CommonsUpload] = []
for upload in uploads_with_mail:
title = upload["title"]
thumb_url = cached_thumbs.get(title, "")
if not thumb_url:
continue
mail_info = upload.get("_mail_info", {})
# Convert title to Commons URL
commons_url = f"https://commons.wikimedia.org/wiki/{title.replace(' ', '_')}"
result.append(
CommonsUpload(
title=title.replace("File:", "").rsplit(".", 1)[0],
thumb_url=thumb_url,
commons_url=commons_url,
flickr_url=upload.get("flickr_url", ""),
creator=upload.get("creator") or "Unknown",
timestamp=upload.get("timestamp", "")[:10],
wikipedia_url=mail_info.get("wikipedia_url", ""),
creator_profile_url=mail_info.get("creator_profile_url", ""),
)
)
return result
def is_valid_flickr_image_url(url: str) -> bool:
"""Check if URL is a valid Flickr static image URL."""
valid_prefixes = (
@ -121,7 +428,9 @@ def search_flickr(search_term: str, page: int = 1) -> SearchResult:
def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult:
"""Parse Flickr search results HTML and extract photo data."""
empty_result = SearchResult(photos=[], total_photos=0, current_page=page, total_pages=0)
empty_result = SearchResult(
photos=[], total_photos=0, current_page=page, total_pages=0
)
# Find the modelExport JSON embedded in the page
start = html.find("modelExport:")
@ -265,10 +574,12 @@ def start() -> str:
"""Start form."""
enwp = flask.request.args.get("enwp")
if not enwp:
return flask.render_template("combined.html")
recent_uploads = get_recent_commons_uploads()
return flask.render_template("combined.html", recent_uploads=recent_uploads)
enwp = enwp.strip()
if not enwp:
return flask.render_template("combined.html")
recent_uploads = get_recent_commons_uploads()
return flask.render_template("combined.html", recent_uploads=recent_uploads)
input_is = "url" if enwiki in enwp else "title"