flickr-mail/main.py
Edward Betts d59e67b55d Display total count of photos obtained via Flickr mail
Show the total number of successful Flickr mail requests on the home
page, not just the count of recent uploads displayed.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 18:17:33 +00:00

689 lines
21 KiB
Python
Executable file

#!/usr/bin/python3
"""Find photos on flickr for Wikipedia articles and contact the photographer."""
import collections
import dataclasses
import inspect
import json
import sys
import time
import traceback
import typing
from pathlib import Path
from urllib.parse import quote, unquote
import flask
import requests
import werkzeug
from werkzeug.debug.tbtools import DebugTraceback
import re
app = flask.Flask(__name__)
app.debug = False
enwiki = "en.wikipedia.org/wiki/"
# Path to Commons contributions data and sent mail
COMMONS_UPLOADS_FILE = (
Path(__file__).parent / "commons_contributions" / "flickr_uploads.json"
)
COMMONS_CACHE_FILE = (
Path(__file__).parent / "commons_contributions" / "thumbnail_cache.json"
)
SENT_MAIL_DIR = Path(__file__).parent / "sent_mail" / "messages"
SENT_MAIL_INDEX_CACHE = (
Path(__file__).parent / "commons_contributions" / "sent_mail_index.json"
)
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
RECENT_UPLOADS_COUNT = 24
# User agent for Commons API requests
COMMONS_USER_AGENT = (
"FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
)
# Browser-like headers for Flickr requests
BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
# Flickr license codes to human-readable names
FLICKR_LICENSES = {
0: "All Rights Reserved",
1: "CC BY-NC-SA",
2: "CC BY-NC",
3: "CC BY-NC-ND",
4: "CC BY",
5: "CC BY-SA",
6: "CC BY-ND",
7: "No known copyright",
8: "US Government",
9: "CC0",
10: "Public Domain",
}
PHOTOS_PER_PAGE = 25
@dataclasses.dataclass
class FlickrPhoto:
"""Represents a Flickr photo from search results."""
id: str
title: str
path_alias: str
owner_nsid: str
username: str
realname: str
license: int
thumb_url: str
medium_url: str
@property
def flickr_url(self) -> str:
"""URL to the photo page on Flickr."""
return f"https://flickr.com/photos/{self.path_alias}/{self.id}"
@property
def license_name(self) -> str:
"""Human-readable license name."""
return FLICKR_LICENSES.get(self.license, f"License {self.license}")
@dataclasses.dataclass
class SearchResult:
"""Flickr search results with pagination metadata."""
photos: list[FlickrPhoto]
total_photos: int
current_page: int
total_pages: int
@dataclasses.dataclass
class CommonsUpload:
"""Represents a recent upload to Wikimedia Commons."""
title: str
thumb_url: str
commons_url: str
flickr_url: str
creator: str
timestamp: str
wikipedia_url: str = ""
creator_profile_url: str = ""
@property
def is_wikidata_item(self) -> bool:
"""Check if the wikipedia_url is actually a Wikidata item."""
if not self.wikipedia_url:
return False
# Match Q followed by digits at the end of the URL
return bool(re.search(r"/Q\d+$", self.wikipedia_url))
@property
def wiki_link_url(self) -> str:
"""Get the correct URL (Wikidata if it's a Q item, otherwise Wikipedia)."""
if not self.wikipedia_url:
return ""
if self.is_wikidata_item:
# Extract Q-id and build Wikidata URL
match = re.search(r"(Q\d+)$", self.wikipedia_url)
if match:
return f"https://www.wikidata.org/wiki/{match.group(1)}"
return self.wikipedia_url
@property
def wiki_link_label(self) -> str:
"""Get the label for the wiki link."""
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
def normalize_flickr_url(url: str) -> str:
"""Normalize a Flickr photo URL for comparison."""
# Remove protocol
url = url.replace("https://", "").replace("http://", "")
# Remove www.
url = url.replace("www.", "")
# Remove trailing slash
url = url.rstrip("/")
# Ensure it starts with flickr.com
if not url.startswith("flickr.com"):
return ""
return url
def extract_urls_from_message(body: str) -> tuple[str, str]:
"""Extract flickr URL and Wikipedia URL from message body."""
flickr_url = ""
wikipedia_url = ""
# Find flickr photo URLs
flickr_pattern = r"(?:https?://)?(?:www\.)?flickr\.com/photos/[^/\s]+/\d+"
flickr_matches = re.findall(flickr_pattern, body)
if flickr_matches:
flickr_url = flickr_matches[0]
if not flickr_url.startswith("http"):
flickr_url = "https://" + flickr_url
# Find Wikipedia URLs
wiki_pattern = r"(?:https?://)?(?:www\.)?en\.wikipedia\.org/wiki/[^\s<\])]+"
wiki_matches = re.findall(wiki_pattern, body)
if wiki_matches:
wikipedia_url = wiki_matches[0]
if not wikipedia_url.startswith("http"):
wikipedia_url = "https://" + wikipedia_url
return flickr_url, wikipedia_url
def build_sent_mail_index() -> dict[str, dict[str, str]]:
"""Build an index of sent mail: normalized_flickr_url -> {wikipedia_url, recipient}."""
if not SENT_MAIL_DIR.exists():
return {}
# Check if we have a cached index
if SENT_MAIL_INDEX_CACHE.exists():
try:
with open(SENT_MAIL_INDEX_CACHE) as f:
cache = json.load(f)
# Check if cache is still valid (compare file count)
json_files = list(SENT_MAIL_DIR.glob("*.json"))
if cache.get("file_count") == len(json_files):
return cache.get("index", {})
except (json.JSONDecodeError, OSError):
pass
index: dict[str, dict[str, str]] = {}
json_files = list(SENT_MAIL_DIR.glob("*.json"))
for json_file in json_files:
try:
with open(json_file) as f:
message = json.load(f)
except (json.JSONDecodeError, OSError):
continue
# Skip replies - we want original requests
subject = message.get("subject", "")
if subject.startswith("Re:"):
continue
body = message.get("body", "")
flickr_url, wikipedia_url = extract_urls_from_message(body)
if not flickr_url:
continue
normalized = normalize_flickr_url(flickr_url)
if not normalized:
continue
# Extract creator profile URL from flickr URL
# flickr.com/photos/username/12345 -> flickr.com/photos/username
parts = flickr_url.split("/")
creator_profile = ""
for i, part in enumerate(parts):
if part == "photos" and i + 1 < len(parts):
username = parts[i + 1]
creator_profile = f"https://www.flickr.com/photos/{username}"
break
index[normalized] = {
"wikipedia_url": wikipedia_url,
"creator_profile_url": creator_profile,
"recipient": message.get("recipient", ""),
}
# Cache the index
try:
with open(SENT_MAIL_INDEX_CACHE, "w") as f:
json.dump({"file_count": len(json_files), "index": index}, f)
except OSError:
pass
return index
def load_commons_thumbnail_cache() -> dict[str, typing.Any]:
"""Load the thumbnail cache from disk."""
if not COMMONS_CACHE_FILE.exists():
return {"timestamp": 0, "thumbnails": {}}
try:
with open(COMMONS_CACHE_FILE) as f:
return typing.cast(dict[str, typing.Any], json.load(f))
except (json.JSONDecodeError, OSError):
return {"timestamp": 0, "thumbnails": {}}
def save_commons_thumbnail_cache(cache: dict[str, typing.Any]) -> None:
"""Save the thumbnail cache to disk."""
try:
with open(COMMONS_CACHE_FILE, "w") as f:
json.dump(cache, f)
except OSError:
pass # Ignore cache write errors
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
"""Fetch thumbnail URLs from Commons API for the given file titles."""
if not titles:
return {}
# Commons API allows up to 50 titles per request
params = {
"action": "query",
"titles": "|".join(titles),
"prop": "imageinfo",
"iiprop": "url",
"iiurlwidth": 150,
"format": "json",
}
headers = {"User-Agent": COMMONS_USER_AGENT}
try:
response = requests.get(
"https://commons.wikimedia.org/w/api.php",
params=params,
headers=headers,
timeout=10,
)
response.raise_for_status()
data = response.json()
except (requests.RequestException, json.JSONDecodeError):
return {}
thumbnails: dict[str, str] = {}
pages = data.get("query", {}).get("pages", {})
for page in pages.values():
title = page.get("title", "")
imageinfo = page.get("imageinfo", [])
if imageinfo:
thumb_url = imageinfo[0].get("thumburl", "")
if thumb_url:
thumbnails[title] = thumb_url
return thumbnails
def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]:
"""Get recent Commons uploads with thumbnails, filtered to those contacted via Flickr mail.
Returns a tuple of (uploads_list, total_count) where total_count is the total number
of uploads obtained via Flickr mail (not just the ones returned).
"""
if not COMMONS_UPLOADS_FILE.exists():
return [], 0
try:
with open(COMMONS_UPLOADS_FILE) as f:
all_uploads = json.load(f)
except (json.JSONDecodeError, OSError):
return [], 0
# Build sent mail index
sent_mail_index = build_sent_mail_index()
# Filter uploads to only those with matching sent mail
# Count all matches, but only keep RECENT_UPLOADS_COUNT for display
uploads_with_mail: list[dict[str, typing.Any]] = []
total_matched = 0
for upload in all_uploads:
flickr_url = upload.get("flickr_url", "")
normalized = normalize_flickr_url(flickr_url)
if normalized and normalized in sent_mail_index:
total_matched += 1
if len(uploads_with_mail) < RECENT_UPLOADS_COUNT:
upload["_mail_info"] = sent_mail_index[normalized]
uploads_with_mail.append(upload)
if not uploads_with_mail:
return [], 0
# Load cache and check if it's still valid
cache = load_commons_thumbnail_cache()
cache_age = time.time() - cache.get("timestamp", 0)
cached_thumbs = cache.get("thumbnails", {})
# Find which titles need fetching
titles = [u["title"] for u in uploads_with_mail]
titles_to_fetch = [t for t in titles if t not in cached_thumbs]
# Fetch missing thumbnails or refresh if cache is old
if titles_to_fetch or cache_age > COMMONS_CACHE_MAX_AGE:
new_thumbs = fetch_commons_thumbnails(
titles if cache_age > COMMONS_CACHE_MAX_AGE else titles_to_fetch
)
cached_thumbs.update(new_thumbs)
cache = {"timestamp": time.time(), "thumbnails": cached_thumbs}
save_commons_thumbnail_cache(cache)
# Build the result list
result: list[CommonsUpload] = []
for upload in uploads_with_mail:
title = upload["title"]
thumb_url = cached_thumbs.get(title, "")
if not thumb_url:
continue
mail_info = upload.get("_mail_info", {})
# Convert title to Commons URL
commons_url = f"https://commons.wikimedia.org/wiki/{title.replace(' ', '_')}"
result.append(
CommonsUpload(
title=title.replace("File:", "").rsplit(".", 1)[0],
thumb_url=thumb_url,
commons_url=commons_url,
flickr_url=upload.get("flickr_url", ""),
creator=upload.get("creator") or "Unknown",
timestamp=upload.get("timestamp", "")[:10],
wikipedia_url=mail_info.get("wikipedia_url", ""),
creator_profile_url=mail_info.get("creator_profile_url", ""),
)
)
return result, total_matched
def is_valid_flickr_image_url(url: str) -> bool:
"""Check if URL is a valid Flickr static image URL."""
valid_prefixes = (
"https://live.staticflickr.com/",
"https://farm", # farm1.staticflickr.com, farm2.staticflickr.com, etc.
"https://c1.staticflickr.com/",
"https://c2.staticflickr.com/",
)
if not url.startswith(valid_prefixes):
return False
# For farm URLs, verify the domain pattern
if url.startswith("https://farm"):
if ".staticflickr.com/" not in url:
return False
return True
def search_flickr(search_term: str, page: int = 1) -> SearchResult:
"""Search Flickr for photos matching the search term."""
encoded_term = quote(f'"{search_term}"')
url = f"https://flickr.com/search/?view_all=1&text={encoded_term}&page={page}"
response = requests.get(url, headers=BROWSER_HEADERS)
response.raise_for_status()
return parse_flickr_search_results(response.text, page)
def parse_flickr_search_results(html: str, page: int = 1) -> SearchResult:
"""Parse Flickr search results HTML and extract photo data."""
empty_result = SearchResult(
photos=[], total_photos=0, current_page=page, total_pages=0
)
# Find the modelExport JSON embedded in the page
start = html.find("modelExport:")
if start == -1:
return empty_result
start += len("modelExport:")
while html[start].isspace():
start += 1
# Parse the JSON by counting braces
brace_count = 0
i = start
in_string = False
escape_next = False
while i < len(html):
char = html[i]
if escape_next:
escape_next = False
i += 1
continue
if char == "\\" and in_string:
escape_next = True
i += 1
continue
if char == '"' and not escape_next:
in_string = not in_string
elif not in_string:
if char == "{":
brace_count += 1
elif char == "}":
brace_count -= 1
if brace_count == 0:
json_str = html[start : i + 1]
break
i += 1
else:
return empty_result
try:
data = json.loads(json_str)
except json.JSONDecodeError:
return empty_result
# Extract photos from the parsed data
photos: list[FlickrPhoto] = []
main = data.get("main", {})
photos_models = main.get("search-photos-lite-models", [])
if not photos_models:
return empty_result
model_data = photos_models[0].get("data", {})
photos_container = model_data.get("photos", {}).get("data", {})
photos_data = photos_container.get("_data", [])
total_photos = photos_container.get("totalItems", 0)
# Calculate total pages (Flickr caps at 4000 results)
total_pages = min(total_photos, 4000) // PHOTOS_PER_PAGE
if min(total_photos, 4000) % PHOTOS_PER_PAGE:
total_pages += 1
for photo_entry in photos_data:
# Skip None entries (placeholders from pagination)
if photo_entry is None:
continue
pd = photo_entry.get("data", {})
if not pd:
continue
sizes = pd.get("sizes", {}).get("data", {})
thumb_data = sizes.get("q", sizes.get("sq", {})).get("data", {})
medium_data = sizes.get("n", sizes.get("m", {})).get("data", {})
thumb_url = thumb_data.get("url", "")
medium_url = medium_data.get("url", "")
# Ensure URLs have protocol
if thumb_url.startswith("//"):
thumb_url = "https:" + thumb_url
if medium_url.startswith("//"):
medium_url = "https:" + medium_url
photos.append(
FlickrPhoto(
id=str(pd.get("id", "")),
title=pd.get("title", ""),
path_alias=pd.get("pathAlias", ""),
owner_nsid=pd.get("ownerNsid", ""),
username=pd.get("username", ""),
realname=pd.get("realname", ""),
license=pd.get("license", 0),
thumb_url=thumb_url,
medium_url=medium_url,
)
)
return SearchResult(
photos=photos,
total_photos=total_photos,
current_page=page,
total_pages=total_pages,
)
@app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
"""Handle exception."""
exec_type, exc_value, current_traceback = sys.exc_info()
assert exc_value
tb = DebugTraceback(exc_value)
summary = tb.render_traceback_html(include_title=False)
exc_lines = "".join(tb._te.format_exception_only())
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
last_frame_args = inspect.getargs(last_frame.f_code)
return (
flask.render_template(
"show_error.html",
plaintext=tb.render_traceback_text(),
exception=exc_lines,
exception_type=tb._te.exc_type.__name__,
summary=summary,
last_frame=last_frame,
last_frame_args=last_frame_args,
),
500,
)
@app.route("/")
def start() -> str:
"""Start form."""
enwp = flask.request.args.get("enwp")
if not enwp:
recent_uploads, total_uploads = get_recent_commons_uploads()
return flask.render_template(
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
)
enwp = enwp.strip()
if not enwp:
recent_uploads, total_uploads = get_recent_commons_uploads()
return flask.render_template(
"combined.html", recent_uploads=recent_uploads, total_uploads=total_uploads
)
input_is = "url" if enwiki in enwp else "title"
wikipedia_url: str
wiki_part1: str
wiki_part2: str
if input_is == "url":
start = enwp.find(enwiki) + len(enwiki)
wiki_part2 = unquote(enwp[start:])
name = wiki_part2
wiki_part1 = enwp[:start]
wikipedia_url = enwp
else:
name = enwp
wiki_part1 = "https://" + enwiki
wiki_part2 = name.replace(" ", "_")
wikipedia_url = wiki_part1 + wiki_part2
if "_(" in name:
name = name[: name.find("_(")]
name = name.replace("_", " ")
flickr_url = flask.request.args.get("flickr")
if not flickr_url:
# Search Flickr for photos
page = flask.request.args.get("page", 1, type=int)
page = max(1, page) # Ensure page is at least 1
search_result = search_flickr(name, page)
return flask.render_template(
"combined.html",
name=name,
enwp=enwp,
search_result=search_result,
)
if "/in/" in flickr_url:
flickr_url = flickr_url[: flickr_url.find("/in/")]
flickr_start = "https://flickr.com/photos/"
assert flickr_url.startswith(flickr_start)
flickr_username = flickr_url[
len(flickr_start) : flickr_url.find("/", len(flickr_start))
]
nsid = flickr_usrename_to_nsid(flickr_username)
assert nsid
print(nsid)
# Get optional image URL for display, validate it's from Flickr
img_url = flask.request.args.get("img")
if img_url and not is_valid_flickr_image_url(img_url):
img_url = None
msg = flask.render_template(
"message.jinja",
flickr_url=flickr_url,
enwp=enwp,
wikipedia_url=wikipedia_url,
name=name,
wiki_part1=wiki_part1,
wiki_part2=wiki_part2,
)
subject = f"Request to use your photo of {name} on Wikipedia"
lines = msg.split("\n\n")
return flask.render_template(
"combined.html",
name=name,
enwp=enwp,
flickr_url=flickr_url,
subject=subject,
lines=lines,
nsid=nsid,
img_url=img_url,
)
def get_params(line_iter: collections.abc.Iterable[str]) -> str:
"""Find and return params from flickr profile page."""
look_for = 'params: {"isEditingTestimonial":false,'
return next(line[line.find("{") :] for line in line_iter if look_for in line)
def flickr_usrename_to_nsid(username: str) -> str:
"""Get NSID from flickr username."""
url = f"https://www.flickr.com/people/{username}/"
r = requests.get(url, headers=BROWSER_HEADERS)
params_str = get_params(r.text.splitlines())
params, _ = json.JSONDecoder().raw_decode(params_str)
return typing.cast(str, params["nsid"])
if __name__ == "__main__":
app.run(host="0.0.0.0")