Extract flickr_mail package with Mapped models and shared utilities

Move from JSON file storage to SQLite database using SQLAlchemy with
Mapped type hints. Deduplicate URL utility functions into shared
flickr_mail package.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Edward Betts 2026-02-07 13:07:23 +00:00
parent ac1b01ea68
commit 9f0fb01878
11 changed files with 1129 additions and 300 deletions

307
main.py
View file

@ -9,14 +9,17 @@ import sys
import time
import traceback
import typing
from pathlib import Path
from urllib.parse import quote, unquote
import flask
import requests
import werkzeug
from sqlalchemy import func
from werkzeug.debug.tbtools import DebugTraceback
from flickr_mail.database import get_session
from flickr_mail.models import FlickrUpload, SentMessage, ThumbnailCache
from flickr_mail.url_utils import extract_urls_from_message, normalize_flickr_url
import re
@ -26,18 +29,6 @@ app.debug = False
enwiki = "en.wikipedia.org/wiki/"
# Path to Commons contributions data and sent mail
COMMONS_UPLOADS_FILE = (
Path(__file__).parent / "commons_contributions" / "flickr_uploads.json"
)
COMMONS_CACHE_FILE = (
Path(__file__).parent / "commons_contributions" / "thumbnail_cache.json"
)
SENT_MAIL_DIR = Path(__file__).parent / "sent_mail" / "messages"
SENT_MAIL_INDEX_FILE = Path(__file__).parent / "sent_mail" / "messages_index.json"
SENT_MAIL_INDEX_CACHE = (
Path(__file__).parent / "commons_contributions" / "sent_mail_index.json"
)
COMMONS_CACHE_MAX_AGE = 86400 * 7 # Cache for 7 days
RECENT_UPLOADS_COUNT = 24
@ -165,132 +156,6 @@ class CommonsUpload:
return "Wikidata item" if self.is_wikidata_item else "Wikipedia article"
def normalize_flickr_url(url: str) -> str:
"""Normalize a Flickr photo URL for comparison."""
# Remove protocol
url = url.replace("https://", "").replace("http://", "")
# Remove www.
url = url.replace("www.", "")
# Remove trailing slash
url = url.rstrip("/")
# Ensure it starts with flickr.com
if not url.startswith("flickr.com"):
return ""
return url
def extract_urls_from_message(body: str) -> tuple[str, str]:
"""Extract flickr URL and Wikipedia URL from message body."""
flickr_url = ""
wikipedia_url = ""
# Find flickr photo URLs
flickr_pattern = r"(?:https?://)?(?:www\.)?flickr\.com/photos/[^/\s]+/\d+"
flickr_matches = re.findall(flickr_pattern, body)
if flickr_matches:
flickr_url = flickr_matches[0]
if not flickr_url.startswith("http"):
flickr_url = "https://" + flickr_url
# Find Wikipedia URLs
wiki_pattern = r"(?:https?://)?(?:www\.)?en\.wikipedia\.org/wiki/[^\s<\])]+"
wiki_matches = re.findall(wiki_pattern, body)
if wiki_matches:
wikipedia_url = wiki_matches[0]
if not wikipedia_url.startswith("http"):
wikipedia_url = "https://" + wikipedia_url
return flickr_url, wikipedia_url
def build_sent_mail_index() -> dict[str, dict[str, str]]:
"""Build an index of sent mail: normalized_flickr_url -> {wikipedia_url, recipient}."""
if not SENT_MAIL_DIR.exists():
return {}
# Check if we have a cached index
if SENT_MAIL_INDEX_CACHE.exists():
try:
with open(SENT_MAIL_INDEX_CACHE) as f:
cache = json.load(f)
# Check if cache is still valid (compare file count)
json_files = list(SENT_MAIL_DIR.glob("*.json"))
if cache.get("file_count") == len(json_files):
return cache.get("index", {})
except (json.JSONDecodeError, OSError):
pass
index: dict[str, dict[str, str]] = {}
json_files = list(SENT_MAIL_DIR.glob("*.json"))
for json_file in json_files:
try:
with open(json_file) as f:
message = json.load(f)
except (json.JSONDecodeError, OSError):
continue
# Skip replies - we want original requests
subject = message.get("subject", "")
if subject.startswith("Re:"):
continue
body = message.get("body", "")
flickr_url, wikipedia_url = extract_urls_from_message(body)
if not flickr_url:
continue
normalized = normalize_flickr_url(flickr_url)
if not normalized:
continue
# Extract creator profile URL from flickr URL
# flickr.com/photos/username/12345 -> flickr.com/photos/username
parts = flickr_url.split("/")
creator_profile = ""
for i, part in enumerate(parts):
if part == "photos" and i + 1 < len(parts):
username = parts[i + 1]
creator_profile = f"https://www.flickr.com/photos/{username}"
break
index[normalized] = {
"wikipedia_url": wikipedia_url,
"creator_profile_url": creator_profile,
"recipient": message.get("recipient", ""),
}
# Cache the index
try:
with open(SENT_MAIL_INDEX_CACHE, "w") as f:
json.dump({"file_count": len(json_files), "index": index}, f)
except OSError:
pass
return index
def load_commons_thumbnail_cache() -> dict[str, typing.Any]:
"""Load the thumbnail cache from disk."""
if not COMMONS_CACHE_FILE.exists():
return {"timestamp": 0, "thumbnails": {}}
try:
with open(COMMONS_CACHE_FILE) as f:
return typing.cast(dict[str, typing.Any], json.load(f))
except (json.JSONDecodeError, OSError):
return {"timestamp": 0, "thumbnails": {}}
def save_commons_thumbnail_cache(cache: dict[str, typing.Any]) -> None:
"""Save the thumbnail cache to disk."""
try:
with open(COMMONS_CACHE_FILE, "w") as f:
json.dump(cache, f)
except OSError:
pass # Ignore cache write errors
def fetch_commons_thumbnails(titles: list[str]) -> dict[str, str]:
"""Fetch thumbnail URLs from Commons API for the given file titles."""
@ -340,79 +205,72 @@ def get_recent_commons_uploads() -> tuple[list[CommonsUpload], int]:
Returns a tuple of (uploads_list, total_count) where total_count is the total number
of uploads obtained via Flickr mail (not just the ones returned).
"""
if not COMMONS_UPLOADS_FILE.exists():
return [], 0
session = get_session()
try:
with open(COMMONS_UPLOADS_FILE) as f:
all_uploads = json.load(f)
except (json.JSONDecodeError, OSError):
return [], 0
# Build sent mail index
sent_mail_index = build_sent_mail_index()
# Filter uploads to only those with matching sent mail
# Count all matches, but only keep RECENT_UPLOADS_COUNT for display
uploads_with_mail: list[dict[str, typing.Any]] = []
total_matched = 0
for upload in all_uploads:
flickr_url = upload.get("flickr_url", "")
normalized = normalize_flickr_url(flickr_url)
if normalized and normalized in sent_mail_index:
total_matched += 1
if len(uploads_with_mail) < RECENT_UPLOADS_COUNT:
upload["_mail_info"] = sent_mail_index[normalized]
uploads_with_mail.append(upload)
if not uploads_with_mail:
return [], 0
# Load cache and check if it's still valid
cache = load_commons_thumbnail_cache()
cache_age = time.time() - cache.get("timestamp", 0)
cached_thumbs = cache.get("thumbnails", {})
# Find which titles need fetching
titles = [u["title"] for u in uploads_with_mail]
titles_to_fetch = [t for t in titles if t not in cached_thumbs]
# Fetch missing thumbnails or refresh if cache is old
if titles_to_fetch or cache_age > COMMONS_CACHE_MAX_AGE:
new_thumbs = fetch_commons_thumbnails(
titles if cache_age > COMMONS_CACHE_MAX_AGE else titles_to_fetch
query = (
session.query(FlickrUpload, SentMessage)
.join(SentMessage)
.order_by(FlickrUpload.timestamp.desc())
)
cached_thumbs.update(new_thumbs)
cache = {"timestamp": time.time(), "thumbnails": cached_thumbs}
save_commons_thumbnail_cache(cache)
total_matched = query.count()
if total_matched == 0:
return [], 0
# Build the result list
result: list[CommonsUpload] = []
for upload in uploads_with_mail:
title = upload["title"]
thumb_url = cached_thumbs.get(title, "")
if not thumb_url:
continue
recent = query.limit(RECENT_UPLOADS_COUNT).all()
mail_info = upload.get("_mail_info", {})
# Get thumbnails from cache
titles = [upload.title for upload, msg in recent]
now = int(time.time())
cached = {
tc.title: tc
for tc in session.query(ThumbnailCache)
.filter(ThumbnailCache.title.in_(titles))
.all()
}
# Convert title to Commons URL
commons_url = f"https://commons.wikimedia.org/wiki/{title.replace(' ', '_')}"
# Find titles needing fetch (missing or expired)
titles_to_fetch = [
t for t in titles
if t not in cached or (now - (cached[t].fetched_at or 0)) > COMMONS_CACHE_MAX_AGE
]
result.append(
CommonsUpload(
title=title.replace("File:", "").rsplit(".", 1)[0],
thumb_url=thumb_url,
commons_url=commons_url,
flickr_url=upload.get("flickr_url", ""),
creator=upload.get("creator") or "Unknown",
timestamp=upload.get("timestamp", "")[:10],
wikipedia_url=mail_info.get("wikipedia_url", ""),
creator_profile_url=mail_info.get("creator_profile_url", ""),
if titles_to_fetch:
new_thumbs = fetch_commons_thumbnails(titles_to_fetch)
for title, thumb_url in new_thumbs.items():
existing = cached.get(title)
if existing:
existing.thumb_url = thumb_url
existing.fetched_at = now
else:
tc = ThumbnailCache(title=title, thumb_url=thumb_url, fetched_at=now)
session.add(tc)
cached[title] = tc
session.commit()
result: list[CommonsUpload] = []
for upload, msg in recent:
thumb_url = cached[upload.title].thumb_url if upload.title in cached else ""
if not thumb_url:
continue
commons_url = f"https://commons.wikimedia.org/wiki/{upload.title.replace(' ', '_')}"
result.append(
CommonsUpload(
title=upload.title.replace("File:", "").rsplit(".", 1)[0],
thumb_url=thumb_url,
commons_url=commons_url,
flickr_url=upload.flickr_url or "",
creator=upload.creator or "Unknown",
timestamp=(upload.timestamp or "")[:10],
wikipedia_url=upload.wikipedia_url or "",
creator_profile_url=upload.creator_profile_url or "",
)
)
)
return result, total_matched
return result, total_matched
finally:
session.close()
def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]:
@ -421,26 +279,33 @@ def get_previous_messages(flickr_user: str, flickr_username: str) -> list[dict]:
Checks both the display name (flickr_user) and username (flickr_username)
against the recipient field in the messages index.
"""
if not SENT_MAIL_INDEX_FILE.exists():
names = set()
if flickr_user:
names.add(flickr_user.lower())
if flickr_username:
names.add(flickr_username.lower())
if not names:
return []
session = get_session()
try:
with open(SENT_MAIL_INDEX_FILE) as f:
messages = json.load(f)
except (json.JSONDecodeError, OSError):
return []
# Normalize for case-insensitive comparison
flickr_user_lower = flickr_user.lower() if flickr_user else ""
flickr_username_lower = flickr_username.lower() if flickr_username else ""
matches = []
for msg in messages:
recipient = msg.get("recipient", "").lower()
if recipient and (recipient == flickr_user_lower or recipient == flickr_username_lower):
matches.append(msg)
return matches
messages = (
session.query(SentMessage)
.filter(func.lower(SentMessage.recipient).in_(names))
.all()
)
return [
{
"message_id": m.message_id,
"subject": m.subject,
"url": m.url,
"recipient": m.recipient,
"date": m.date,
}
for m in messages
]
finally:
session.close()
def parse_category_input(category_input: str) -> str | None: