#!/usr/bin/python3 """Find photos on flickr for Wikipedia articles and contact the photographer.""" import collections import dataclasses import inspect import json import sys import time import traceback import typing from pathlib import Path from urllib.parse import quote, unquote import flask import requests import werkzeug from werkzeug.debug.tbtools import DebugTraceback import re app = flask.Flask(__name__) app.debug = False enwiki = "en.wikipedia.org/wiki/" # Path to Commons contributions data and sent mail COMMONS_UPLOADS_FILE = ( Path(__file__).parent / "commons_contributions" / "flickr_uploads.json" ) COMMONS_CACHE_FILE = ( Path(__file__).parent / "commons_contributions" / "thumbnail_cache.json" ) SENT_MAIL_DIR = Path(__file__).parent / "sent_mail" / "messages" SENT_MAIL_INDEX_FILE = Path(__file__).parent / "sent_mail" / "messages_index.json" SENT_MAIL_INDEX_CACHE = ( Path(__file__).parent / "commons_contributions" / "sent_mail_index.json" ) COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days RECENT_UPLOADS_COUNT = 24 # User agent for Wikimedia API requests WIKIMEDIA_USER_AGENT = ( "FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)" ) WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php" # Browser-like headers for Flickr requests BROWSER_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", } # Flickr license codes to human-readable names FLICKR_LICENSES = { 0: "All Rights Reserved", 1: "CC BY-NC-SA 2.0", 2: "CC BY-NC 2.0", 3: "CC BY-NC-ND 2.0", 4: "CC BY 2.0", 5: "CC BY-SA 2.0", 6: "CC BY-ND 2.0", 7: "No known copyright", 8: "US Government", 9: "CC0", 10: "Public Domain", # CC 4.0 licenses (codes confirmed via Flickr) 16: "CC BY-NC-ND 4.0", } # Non-free CC licenses (NC or ND restrictions) NONFREE_CC_LICENSES = {1, 2, 3, 6, 11, 12, 13, 16} # Wikipedia-compatible free licenses FREE_LICENSES = {4, 5, 7, 8, 9, 10, 14, 15} PHOTOS_PER_PAGE = 25 @dataclasses.dataclass class FlickrPhoto: """Represents a Flickr photo from search results.""" id: str title: str path_alias: str owner_nsid: str username: str realname: str license: int thumb_url: str medium_url: str @property def flickr_url(self) -> str: """URL to the photo page on Flickr.""" return f"https://flickr.com/photos/{self.path_alias}/{self.id}" @property def license_name(self) -> str: """Human-readable license name.""" return FLICKR_LICENSES.get(self.license, f"License {self.license}") @dataclasses.dataclass class SearchResult: """Flickr search results with pagination metadata.""" photos: list[FlickrPhoto] total_photos: int current_page: int total_pages: int @dataclasses.dataclass class CommonsUpload: """Represents a recent upload to Wikimedia Commons.""" title: str thumb_url: str commons_url: str flickr_url: str creator: str timestamp: str wikipedia_url: str = "" creator_profile_url: str = "" @property def is_wikidata_item(self) -> bool: """Check if the wikipedia_url is actually a Wikidata item.""" if not self.wikipedia_url: return False # Match Q followed by digits at the end of the URL return bool(re.search(r"/Q\d+$", self.wikipedia_url)) @property def wiki_link_url(self) -> str: """Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia).""" if not self.wikipedia_url: return "" if self.is_wikidata_item: # Extract Q-id and build Wikidata URL match = re.search(r"(Q\d+)$", self.wikipedia_url) if match: return f"https://www.wikidata.org/wiki/{match.group(1)}" return self.wikipedia_url @property def wiki_link_label(self) -> str: """Get the label for the wiki link.""" return "Wikidata item" if self.is_wikidata_item else "Wikipedia article" def normalize_flickr_url(url: str) -> str: """Normalize a Flickr photo URL for comparison.""" # Remove protocol url = url.replace("https://", "").replace("http://", "") # Remove www. url = url.replace("www.", "") # Remove trailing slash url = url.rstrip("/") # Ensure it starts with flickr.com if not url.startswith("flickr.com"): return "" return url def extract_urls_from_message(body: str) -> tuple[str, str]: """Extract flickr URL and Wikipedia URL from message body.""" flickr_url = "" wikipedia_url = "" # Find flickr photo URLs flickr_pattern = r"(?:https?://)?(?:www\.)?flickr\.com/photos/[^/\s]+/\d+" flickr_matches = re.findall(flickr_pattern, body) if flickr_matches: flickr_url = flickr_matches[0] if not flickr_url.startswith("http"): flickr_url = "https://" + flickr_url # Find Wikipedia URLs wiki_pattern = r"(?:https?://)?(?:www\.)?en\.wikipedia\.org/wiki/[^\s<\])]+" wiki_matches = re.findall(wiki_pattern, body) if wiki_matches: wikipedia_url = wiki_matches[0] if not wikipedia_url.startswith("http"): wikipedia_url = "https://" + wikipedia_url return flickr_url, wikipedia_url def build_sent_mail_index() -> dict[str, dict[str, str]]: """Build an index of sent mail: normalized_flickr_url -> {wikipedia_url, recipient}.""" if not SENT_MAIL_DIR.exists(): return {} # Check if we have a cached index if SENT_MAIL_INDEX_CACHE.exists(): try: with open(SENT_MAIL_INDEX_CACHE) as f: cache = json.load(f) # Check if cache is still valid (compare file count) json_files = list(SENT_MAIL_DIR.glob("*.json")) if cache.get("file_count") == len(json_files): return cache.get("index", {}) except (json.JSONDecodeError, OSError): pass index: dict[str, dict[str, str]] = {} json_files = list(SENT_MAIL_DIR.glob("*.json")) for json_file in json_files: try: with open(json_file) as f: message = json.load(f) except (json.JSONDecodeError, OSError): continue # Skip replies - we want original requests subject = message.get("subject", "") if subject.startswith("Re:"): continue body = message.get("body", "") flickr_url, wikipedia_url = extract_urls_from_message(body) if not flickr_url: continue normalized = normalize_flickr_url(flickr_url) if not normalized: continue # Extract creator profile URL from flickr URL # flickr.com/photos/username/12345 -> flickr.com/photos/username parts = flickr_url.split("/") creator_profile = "" for i, part in enumerate(parts): if part == "photos" and i + 1 < len(parts): username = parts[i + 1] creator_profile = f"https://www.flickr.com/photos/{username}" break index[normalized] = { "wikipedia_url": wikipedia_url, "creator_profile_url": creator_profile, "recipient": message.get("recipient", ""), } # Cache the index try: with open(SENT_MAIL_INDEX_CACHE, "w") as f: json.dump({"file_count": len(json_files), "index": index}, f) except OSError: pass return index def load_commons_thumbnail_cache() -> dict[str, typing.Any]: """Load the thumbnail cache from disk.""" if not COMMONS_CACHE_FILE.exists(): return {"timestamp": 0, "thumbnails": {}} try: with open(COMMONS_CACHE_FILE) as f: return typing.cast(dict[str, typing.Any], json.load(f)) except (json.JSONDecodeError, OSError): return {"timestamp": 0, "thumbnails": {}} def save_commons_thumbnail_cache(cache: dict[str, typing.Any]) -> None: """Save the thumbnail cache to disk.""" try: with open(COMMONS_CACHE_FILE, "w") as f: json.dump(cache, f) except OSError: pass # Ignore cache write errors def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]: """Fetch thumbnail URLs from Commons API for the given file titles.""" if not titles: return {} # Commons API allows up to 50 titles per request params = { "action": "query", "titles": "|".join(titles), "prop": "imageinfo", "iiprop": "url", "iiurlwidth": 150, "format": "json", } headers = {"User-Agent": WIKIMEDIA_USER_AGENT} try: response = requests.get( "https://commons.wikimedia.org/w/api.php", params=params, headers=headers, timeout=10, ) response.raise_for_status() data = response.json() except (requests.RequestException, json.JSONDecodeError): return {} thumbnails: dict[str, str] = {} pages = data.get("query", {}).get("pages", {}) for page in pages.values(): title = page.get("title", "") imageinfo = page.get("imageinfo", []) if imageinfo: thumb_url = imageinfo[0].get("thumburl", "") if thumb_url: thumbnails[title] = thumb_url return thumbnails def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]: """Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail. Returns a tuple of (uploads_list, total_count) where total_count is the total number of uploads obtained via Flickr mail (not just the ones returned). """ if not COMMONS_UPLOADS_FILE.exists(): return [], 0 try: with open(COMMONS_UPLOADS_FILE) as f: all_uploads = json.load(f) except (json.JSONDecodeError, OSError): return [], 0 # Build sent mail index sent_mail_index = build_sent_mail_index() # Filter uploads to only those with matching sent mail # Count all matches, but only keep RECENT_UPLOADS_COUNT for display uploads_with_mail: list[dict[str, typing.Any]] = [] total_matched = 0 for upload in all_uploads: flickr_url = upload.get("flickr_url", "") normalized = normalize_flickr_url(flickr_url) if normalized and normalized in sent_mail_index: total_matched += 1 if len(uploads_with_mail) < RECENT_UPLOADS_COUNT: upload["_mail_info"] = sent_mail_index[normalized] uploads_with_mail.append(upload) if not uploads_with_mail: return [], 0 # Load cache and check if it's still valid cache = load_commons_thumbnail_cache() cache_age = time.time() - cache.get("timestamp", 0) cached_thumbs = cache.get("thumbnails", {}) # Find which titles need fetching titles = [u["title"] for u in uploads_with_mail] titles_to_fetch = [t for t in titles if t not in cached_thumbs] # Fetch missing thumbnails or refresh if cache is old if titles_to_fetch or cache_age > COMMONS_CACHE_MAX_AGE: new_thumbs = fetch_commons_thumbnails( titles if cache_age > COMMONS_CACHE_MAX_AGE else titles_to_fetch ) cached_thumbs.update(new_thumbs) cache = {"timestamp": time.time(), "thumbnails": cached_thumbs} save_commons_thumbnail_cache(cache) # Build the result list result: list[CommonsUpload] = [] for upload in uploads_with_mail: title = upload["title"] thumb_url = cached_thumbs.get(title, "") if not thumb_url: continue mail_info = upload.get("_mail_info", {}) # Convert title to Commons URL commons_url = f"https://commons.wikimedia.org/wiki/{title.replace(' ', '_')}" result.append( CommonsUpload( title=title.replace("File:", "").rsplit(".", 1)[0], thumb_url=thumb_url, commons_url=commons_url, flickr_url=upload.get("flickr_url", ""), creator=upload.get("creator") or "Unknown", timestamp=upload.get("timestamp", "")[:10], wikipedia_url=mail_info.get("wikipedia_url", ""), creator_profile_url=mail_info.get("creator_profile_url", ""), ) ) return result, total_matched def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]: """Get previous messages sent to a Flickr user. Checks both the display name (flickr_user) and username (flickr_username) against the recipient field in the messages index. """ if not SENT_MAIL_INDEX_FILE.exists(): return [] try: with open(SENT_MAIL_INDEX_FILE) as f: messages = json.load(f) except (json.JSONDecodeError, OSError): return [] # Normalize for case-insensitive comparison flickr_user_lower = flickr_user.lower() if flickr_user else "" flickr_username_lower = flickr_username.lower() if flickr_username else "" matches = [] for msg in messages: recipient = msg.get("recipient", "").lower() if recipient and (recipient == flickr_user_lower or recipient == flickr_username_lower): matches.append(msg) return matches def parse_category_input(category_input: str) -> str | None: """Parse category title from URL or direct input. Returns the category title with 'Category:' prefix, or None if invalid. """ category_input = category_input.strip() # Handle URL format: https://en.wikipedia.org/wiki/Category:Example if "wikipedia.org" in category_input: match = re.search(r"/wiki/(Category:[^#?]+)", category_input) if match: return unquote(match.group(1)).replace("_", " ") return None # Handle direct input - add Category: prefix if missing if category_input.startswith("Category:"): return category_input.replace("_", " ") # Assume it's just the category name return f"Category:{category_input.replace('_', ' ')}" @dataclasses.dataclass class ArticleWithoutImage: """Represents a Wikipedia article that needs an image.""" title: str pageid: int @property def wikipedia_url(self) -> str: """URL to the Wikipedia article.""" return f"https://en.wikipedia.org/wiki/{self.title.replace(' ', '_')}" @property def search_url(self) -> str: """URL to search for this article in Flickr Mail.""" return f"/?enwp={quote(self.title)}" # Common non-content images to ignore when checking if an article has images NON_CONTENT_IMAGE_PATTERNS = [ "OOjs UI icon", "Commons-logo", "Symbol ", "Edit-ltr", "Ambox ", "Question book", "Wiki letter", "Text document", "Folder ", "Crystal ", "Nuvola ", "Gnome-", "Disambig ", "DAB ", ] def has_content_image(images: list[dict]) -> bool: """Check if an article has a content image (not just UI icons/logos).""" for img in images: title = img.get("title", "") # Skip if it matches any non-content pattern is_non_content = any(pattern in title for pattern in NON_CONTENT_IMAGE_PATTERNS) if not is_non_content: return True return False def get_articles_without_images( category: str, limit: int = 100 ) -> tuple[list[ArticleWithoutImage], str | None]: """Get articles in a category that don't have images. Uses generator=categorymembers with prop=images to efficiently check multiple articles in a single API request. Returns a tuple of (articles_list, continue_token). """ params = { "action": "query", "generator": "categorymembers", "gcmtitle": category, "gcmtype": "page", # Only articles, not subcategories or files "gcmnamespace": "0", # Main namespace only "gcmlimit": str(limit), "prop": "images", "imlimit": "max", # Need enough to check all pages in batch "format": "json", } headers = {"User-Agent": WIKIMEDIA_USER_AGENT} try: response = requests.get( WIKIPEDIA_API, params=params, headers=headers, timeout=30 ) response.raise_for_status() data = response.json() except (requests.RequestException, json.JSONDecodeError) as e: print(f"Wikipedia API error: {e}") return [], None articles_without_images: list[ArticleWithoutImage] = [] pages = data.get("query", {}).get("pages", {}) for page in pages.values(): images = page.get("images", []) # Skip if page has content images (not just UI icons) if has_content_image(images): continue title = page.get("title", "") pageid = page.get("pageid", 0) if title and pageid: articles_without_images.append( ArticleWithoutImage(title=title, pageid=pageid) ) # Sort by title for consistent display articles_without_images.sort(key=lambda a: a.title) # Get continue token if there are more results continue_token = data.get("continue", {}).get("gcmcontinue") return articles_without_images, continue_token def is_valid_flickr_image_url(url: str) -> bool: """Check if URL is a valid Flickr static image URL.""" valid_prefixes = ( "https://live.staticflickr.com/", "https://farm", # farm1.staticflickr.com, farm2.staticflickr.com, etc. "https://c1.staticflickr.com/", "https://c2.staticflickr.com/", ) if not url.startswith(valid_prefixes): return False # For farm URLs, verify the domain pattern if url.startswith("https://farm"): if ".staticflickr.com/" not in url: return False return True def search_flickr(search_term: str, page: int = 1) -> SearchResult: """Search Flickr for photos matching the search term.""" encoded_term = quote(f'"{search_term}"') url = f"https://flickr.com/search/?view_all=1&text={encoded_term}&page={page}" response = requests.get(url, headers=BROWSER_HEADERS) response.raise_for_status() return parse_flickr_search_results(response.text, page) def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult: """Parse Flickr search results HTML and extract photo data.""" empty_result = SearchResult( photos=[], total_photos=0, current_page=page, total_pages=0 ) # Find the modelExport JSON embedded in the page start = html.find("modelExport:") if start == -1: return empty_result start += len("modelExport:") while html[start].isspace(): start += 1 # Parse the JSON by counting braces brace_count = 0 i = start in_string = False escape_next = False while i < len(html): char = html[i] if escape_next: escape_next = False i += 1 continue if char == "\\" and in_string: escape_next = True i += 1 continue if char == '"' and not escape_next: in_string = not in_string elif not in_string: if char == "{": brace_count += 1 elif char == "}": brace_count -= 1 if brace_count == 0: json_str = html[start : i + 1] break i += 1 else: return empty_result try: data = json.loads(json_str) except json.JSONDecodeError: return empty_result # Extract photos from the parsed data photos: list[FlickrPhoto] = [] main = data.get("main", {}) photos_models = main.get("search-photos-lite-models", []) if not photos_models: return empty_result model_data = photos_models[0].get("data", {}) photos_container = model_data.get("photos", {}).get("data", {}) photos_data = photos_container.get("_data", []) total_photos = photos_container.get("totalItems", 0) # Calculate total pages (Flickr caps at 4000 results) total_pages = min(total_photos, 4000) // PHOTOS_PER_PAGE if min(total_photos, 4000) % PHOTOS_PER_PAGE: total_pages += 1 for photo_entry in photos_data: # Skip None entries (placeholders from pagination) if photo_entry is None: continue pd = photo_entry.get("data", {}) if not pd: continue sizes = pd.get("sizes", {}).get("data", {}) thumb_data = sizes.get("q", sizes.get("sq", {})).get("data", {}) medium_data = sizes.get("n", sizes.get("m", {})).get("data", {}) thumb_url = thumb_data.get("url", "") medium_url = medium_data.get("url", "") # Ensure URLs have protocol if thumb_url.startswith("//"): thumb_url = "https:" + thumb_url if medium_url.startswith("//"): medium_url = "https:" + medium_url photos.append( FlickrPhoto( id=str(pd.get("id", "")), title=pd.get("title", ""), path_alias=pd.get("pathAlias", ""), owner_nsid=pd.get("ownerNsid", ""), username=pd.get("username", ""), realname=pd.get("realname", ""), license=pd.get("license", 0), thumb_url=thumb_url, medium_url=medium_url, ) ) return SearchResult( photos=photos, total_photos=total_photos, current_page=page, total_pages=total_pages, ) @app.errorhandler(werkzeug.exceptions.InternalServerError) def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]: """Handle exception.""" exec_type, exc_value, current_traceback = sys.exc_info() assert exc_value tb = DebugTraceback(exc_value) summary = tb.render_traceback_html(include_title=False) exc_lines = "".join(tb._te.format_exception_only()) last_frame = list(traceback.walk_tb(current_traceback))[-1][0] last_frame_args = inspect.getargs(last_frame.f_code) return ( flask.render_template( "show_error.html", plaintext=tb.render_traceback_text(), exception=exc_lines, exception_type=tb._te.exc_type.__name__, summary=summary, last_frame=last_frame, last_frame_args=last_frame_args, ), 500, ) @app.route("/") def start() -> str: """Start form.""" enwp = flask.request.args.get("enwp") if not enwp: recent_uploads, total_uploads = get_recent_commons_uploads() return flask.render_template( "combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads ) enwp = enwp.strip() if not enwp: recent_uploads, total_uploads = get_recent_commons_uploads() return flask.render_template( "combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads ) input_is = "url" if enwiki in enwp else "title" wikipedia_url: str wiki_part1: str wiki_part2: str if input_is == "url": start = enwp.find(enwiki) + len(enwiki) wiki_part2 = unquote(enwp[start:]) name = wiki_part2 wiki_part1 = enwp[:start] wikipedia_url = enwp else: name = enwp wiki_part1 = "https://" + enwiki wiki_part2 = name.replace(" ", "_") wikipedia_url = wiki_part1 + wiki_part2 # Remove disambiguation suffix like "(academic)" for Flickr search name = name.replace("_", " ") if " (" in name: name = name[: name.find(" (")] # Get category param if coming from category search cat = flask.request.args.get("cat") flickr_url = flask.request.args.get("flickr") if not flickr_url: # Search Flickr for photos page = flask.request.args.get("page", 1, type=int) page = max(1, page) # Ensure page is at least 1 search_result = search_flickr(name, page) return flask.render_template( "combined.html", name=name, enwp=enwp, search_result=search_result, cat=cat, ) if "/in/" in flickr_url: flickr_url = flickr_url[: flickr_url.find("/in/")] flickr_start = "https://flickr.com/photos/" assert flickr_url.startswith(flickr_start) flickr_username = flickr_url[ len(flickr_start) : flickr_url.find("/", len(flickr_start)) ] nsid = flickr_usrename_to_nsid(flickr_username) assert nsid print(nsid) # Get optional image URL for display, validate it's from Flickr img_url = flask.request.args.get("img") if img_url and not is_valid_flickr_image_url(img_url): img_url = None # Get flickr_user name and build profile URL flickr_user = flask.request.args.get("flickr_user", "") flickr_user_url = f"https://www.flickr.com/photos/{flickr_username}/" # Check for previous messages to this user previous_messages = get_previous_messages(flickr_user, flickr_username) # Get license code if provided license_code = flask.request.args.get("license", type=int) license_name = ( FLICKR_LICENSES.get(license_code, "") if license_code is not None else "" ) is_free_license = license_code in FREE_LICENSES is_nonfree_cc = license_code in NONFREE_CC_LICENSES # For free licenses, show upload options instead of message if is_free_license: return flask.render_template( "combined.html", name=name, enwp=enwp, flickr_url=flickr_url, img_url=img_url, license_code=license_code, license_name=license_name, is_free_license=True, wikipedia_url=wikipedia_url, flickr_user=flickr_user, flickr_user_url=flickr_user_url, cat=cat, previous_messages=previous_messages, ) msg = flask.render_template( "message.jinja", flickr_url=flickr_url, enwp=enwp, wikipedia_url=wikipedia_url, name=name, wiki_part1=wiki_part1, wiki_part2=wiki_part2, is_nonfree_cc=is_nonfree_cc, license_name=license_name, ) subject = f"Request to use your photo of {name} on Wikipedia" lines = msg.split("\n\n") return flask.render_template( "combined.html", name=name, enwp=enwp, flickr_url=flickr_url, subject=subject, lines=lines, nsid=nsid, img_url=img_url, license_code=license_code, license_name=license_name, flickr_user=flickr_user, flickr_user_url=flickr_user_url, cat=cat, previous_messages=previous_messages, ) @app.route("/category") def category_search() -> str: """Find articles in a Wikipedia category that need images.""" cat = flask.request.args.get("cat", "").strip() if not cat: return flask.render_template("category.html") category = parse_category_input(cat) if not category: return flask.render_template( "category.html", error="Invalid category format. Please enter a category name or URL.", cat=cat, ) articles, continue_token = get_articles_without_images(category) # Get the display name (without Category: prefix) category_name = category.replace("Category:", "") return flask.render_template( "category.html", cat=cat, category=category, category_name=category_name, articles=articles, continue_token=continue_token, ) def get_params(line_iter: collections.abc.Iterable[str]) -> str: """Find and return params from flickr profile page.""" look_for = 'params: {"isEditingTestimonial":false,' return next(line[line.find("{") :] for line in line_iter if look_for in line) def flickr_usrename_to_nsid(username: str) -> str: """Get NSID from flickr username.""" url = f"https://www.flickr.com/people/{username}/" r = requests.get(url, headers=BROWSER_HEADERS) params_str = get_params(r.text.splitlines()) params, _ = json.JSONDecoder().raw_decode(params_str) return typing.cast(str, params["nsid"]) if __name__ == "__main__": app.run(host="0.0.0.0")