193 lines
6.4 KiB
Python
193 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Find UploadWizard contributions that are from Flickr and add them to the database.
|
|
|
|
Supports both UploadWizard comment styles:
|
|
- "User created page with UploadWizard" (older)
|
|
- "Uploaded a work by ... with UploadWizard" (newer, often includes Flickr URL)
|
|
|
|
If a Flickr URL is not present in the contribution comment, queries Commons API
|
|
to check if the image source is Flickr (by checking the Credit field).
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
|
|
import requests
|
|
|
|
from flickr_mail.database import init_db, get_session
|
|
from flickr_mail.models import Contribution, FlickrUpload, SentMessage
|
|
from flickr_mail.url_utils import normalize_flickr_url
|
|
|
|
COMMONS_API = "https://commons.wikimedia.org/w/api.php"
|
|
USER_AGENT = "FlickrMail/1.0 (https://edwardbetts.com/flickr_mail/; edward@4angle.com)"
|
|
|
|
|
|
def extract_flickr_url_from_credit(credit: str) -> str | None:
|
|
"""Extract Flickr URL from the Credit field HTML."""
|
|
pattern = r'https?://(?:www\.)?flickr\.com/photos/[^/"\s<>]+/\d+'
|
|
match = re.search(pattern, credit)
|
|
return match.group(0) if match else None
|
|
|
|
|
|
def extract_flickr_url_from_comment(comment: str) -> str | None:
|
|
"""Extract Flickr URL directly from a contribution comment."""
|
|
pattern = r'https?://(?:www\.)?flickr\.com/photos/[^/\s]+/\d+'
|
|
match = re.search(pattern, comment or "")
|
|
return match.group(0) if match else None
|
|
|
|
|
|
def get_image_metadata(titles: list[str]) -> dict[str, dict]:
|
|
"""Fetch image metadata from Commons API for multiple titles."""
|
|
if not titles:
|
|
return {}
|
|
|
|
# Commons API allows up to 50 titles per request
|
|
params = {
|
|
"action": "query",
|
|
"titles": "|".join(titles),
|
|
"prop": "imageinfo",
|
|
"iiprop": "extmetadata",
|
|
"format": "json",
|
|
}
|
|
|
|
headers = {"User-Agent": USER_AGENT}
|
|
|
|
try:
|
|
response = requests.get(COMMONS_API, params=params, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except (requests.RequestException, json.JSONDecodeError) as e:
|
|
print(f"API error: {e}")
|
|
return {}
|
|
|
|
results = {}
|
|
pages = data.get("query", {}).get("pages", {})
|
|
for page in pages.values():
|
|
title = page.get("title", "")
|
|
imageinfo = page.get("imageinfo", [])
|
|
if imageinfo:
|
|
extmeta = imageinfo[0].get("extmetadata", {})
|
|
results[title] = {
|
|
"credit": extmeta.get("Credit", {}).get("value", ""),
|
|
"artist": extmeta.get("Artist", {}).get("value", ""),
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def clean_artist_name(artist_html: str) -> str:
|
|
"""Extract plain text artist name from HTML."""
|
|
# Remove HTML tags
|
|
text = re.sub(r"<[^>]+>", "", artist_html)
|
|
# Clean up whitespace
|
|
text = " ".join(text.split())
|
|
return text
|
|
|
|
|
|
def main():
|
|
init_db()
|
|
session = get_session()
|
|
|
|
try:
|
|
# Get existing normalized flickr URLs to avoid duplicates
|
|
existing_urls = {
|
|
r[0] for r in session.query(FlickrUpload.normalized_flickr_url).all()
|
|
if r[0]
|
|
}
|
|
print(f"Existing uploads: {session.query(FlickrUpload).count()}")
|
|
print(f"Existing flickr URLs: {len(existing_urls)}")
|
|
|
|
# Build sent message index for FK linking
|
|
sent_messages = (
|
|
session.query(SentMessage)
|
|
.filter(SentMessage.normalized_flickr_url != "")
|
|
.filter(~SentMessage.subject.startswith("Re:"))
|
|
.all()
|
|
)
|
|
url_to_message = {msg.normalized_flickr_url: msg for msg in sent_messages}
|
|
|
|
# Find UploadWizard file uploads.
|
|
# Old format: "User created page with UploadWizard"
|
|
# New format: "Uploaded a work by ... with UploadWizard"
|
|
upload_wizard = (
|
|
session.query(Contribution)
|
|
.filter(Contribution.comment.contains("UploadWizard"))
|
|
.filter(Contribution.title.startswith("File:"))
|
|
.all()
|
|
)
|
|
|
|
print(f"UploadWizard contributions to check: {len(upload_wizard)}")
|
|
|
|
# Process in batches of 50
|
|
new_count = 0
|
|
batch_size = 50
|
|
|
|
for i in range(0, len(upload_wizard), batch_size):
|
|
batch = upload_wizard[i : i + batch_size]
|
|
titles = [c.title for c in batch]
|
|
|
|
print(
|
|
f"Processing batch {i // batch_size + 1}/"
|
|
f"{(len(upload_wizard) + batch_size - 1) // batch_size}..."
|
|
)
|
|
|
|
metadata = get_image_metadata(titles)
|
|
|
|
for c in batch:
|
|
meta = metadata.get(c.title, {})
|
|
credit = meta.get("credit", "")
|
|
artist = meta.get("artist", "")
|
|
|
|
# Prefer URL directly in comment; fall back to extmetadata Credit.
|
|
flickr_url = extract_flickr_url_from_comment(c.comment or "")
|
|
if not flickr_url:
|
|
flickr_url = extract_flickr_url_from_credit(credit)
|
|
if not flickr_url:
|
|
continue
|
|
|
|
normalized = normalize_flickr_url(flickr_url)
|
|
if normalized in existing_urls:
|
|
continue
|
|
|
|
creator = clean_artist_name(artist) if artist else None
|
|
|
|
# Look up sent message for FK linking
|
|
msg = url_to_message.get(normalized) if normalized else None
|
|
|
|
session.add(FlickrUpload(
|
|
pageid=c.pageid,
|
|
revid=c.revid,
|
|
title=c.title,
|
|
timestamp=c.timestamp,
|
|
flickr_url=flickr_url,
|
|
normalized_flickr_url=normalized,
|
|
creator=creator,
|
|
wikipedia_url=msg.wikipedia_url if msg else "",
|
|
creator_profile_url=msg.creator_profile_url if msg else "",
|
|
sent_message_id=msg.message_id if msg else None,
|
|
))
|
|
new_count += 1
|
|
existing_urls.add(normalized)
|
|
print(f" Found: {c.title[:50]} -> {flickr_url}")
|
|
|
|
session.commit()
|
|
|
|
# Rate limiting
|
|
if i + batch_size < len(upload_wizard):
|
|
time.sleep(0.5)
|
|
|
|
total = session.query(FlickrUpload).count()
|
|
print(f"\nFound {new_count} new Flickr uploads")
|
|
print(f"Total: {total} uploads in database")
|
|
|
|
except Exception:
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|