#!/usr/bin/env python3 """Download Wikimedia Commons contributions for a user.""" import json import time import requests from flickr_mail.database import init_db, get_session from flickr_mail.models import Contribution API_URL = "https://commons.wikimedia.org/w/api.php" USERNAME = "Edward" # Identify ourselves properly to Wikimedia USER_AGENT = "CommonsContributionsDownloader/0.1 (edward@4angle.com)" SESSION = requests.Session() SESSION.headers.update({"User-Agent": USER_AGENT}) def fetch_contributions( continue_token: str | None = None, ) -> tuple[list[dict], str | None]: """Fetch a batch of contributions from the API.""" params = { "action": "query", "list": "usercontribs", "ucuser": USERNAME, "uclimit": "500", "ucprop": "ids|title|timestamp|comment|size|sizediff|flags|tags", "format": "json", } if continue_token: params["uccontinue"] = continue_token response = SESSION.get(API_URL, params=params) response.raise_for_status() data = response.json() contributions = data.get("query", {}).get("usercontribs", []) # Get continuation token if more results available new_continue = data.get("continue", {}).get("uccontinue") return contributions, new_continue def insert_contribution(session, c: dict) -> None: """Insert a contribution row (caller must ensure revid is new).""" session.add(Contribution( userid=c.get("userid"), user=c.get("user"), pageid=c.get("pageid"), revid=c.get("revid"), parentid=c.get("parentid"), ns=c.get("ns"), title=c.get("title"), timestamp=c.get("timestamp"), minor=c.get("minor"), top=c.get("top"), comment=c.get("comment"), size=c.get("size"), sizediff=c.get("sizediff"), tags=json.dumps(c.get("tags", [])), )) def main() -> None: """Main entry point.""" init_db() session = get_session() try: existing_count = session.query(Contribution).count() # Get the latest timestamp to know where to resume from latest = ( session.query(Contribution) .order_by(Contribution.timestamp.desc()) .first() ) if existing_count > 0 and latest: print(f"Database has {existing_count} contributions") print(f"Latest: {latest.timestamp}") print("Fetching new contributions...") else: print(f"Downloading contributions for user: {USERNAME}") batch_num = 0 new_count = 0 continue_token = None while True: batch_num += 1 print(f" Fetching batch {batch_num}...", end=" ", flush=True) contributions, continue_token = fetch_contributions(continue_token) if not contributions: print("no results") break # One DB query per batch to identify already-known revisions. revids = [c["revid"] for c in contributions if "revid" in c] existing_revids = { row[0] for row in ( session.query(Contribution.revid) .filter(Contribution.revid.in_(revids)) .all() ) } batch_new = 0 for c in contributions: revid = c.get("revid") if revid in existing_revids: continue insert_contribution(session, c) batch_new += 1 new_count += batch_new print(f"got {len(contributions)}, {batch_new} new") session.commit() if batch_new == 0: # All contributions in this batch already exist, we're caught up print(" Caught up with existing data") break if not continue_token: break # Be polite to the API time.sleep(0.5) total = session.query(Contribution).count() print(f"\nDone! {new_count} new contributions, {total} total in database") except Exception: session.rollback() raise finally: session.close() if __name__ == "__main__": main()