Log Wikimedia API requests

This commit is contained in:
Edward Betts 2026-05-16 09:08:30 +00:00
parent 0188cbe0bf
commit 93a4572a5d
3 changed files with 241 additions and 3 deletions

View file

@ -1,5 +1,7 @@
"""Wikidata API functions.""" """Wikidata API functions."""
import os
from pathlib import Path
import typing import typing
import urllib.parse import urllib.parse
@ -10,10 +12,37 @@ from flask import render_template
from requests.exceptions import JSONDecodeError, RequestException from requests.exceptions import JSONDecodeError, RequestException
from . import headers, mail from . import headers, mail
from .wikimedia_api_logging import WikimediaApiLogConfig, WikimediaRequestTimer
wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"
wikidata_api_url = "https://www.wikidata.org/w/api.php"
wd_entity = "http://www.wikidata.org/entity/Q" wd_entity = "http://www.wikidata.org/entity/Q"
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:" commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
wikimedia_log_config = WikimediaApiLogConfig(
tool="geocode",
log_path=Path(
os.environ.get(
"GEOCODE_WIKIMEDIA_API_LOG", "/var/log/geocode/wikimedia-api.jsonl"
)
),
user_agent=headers["User-Agent"],
)
def logged_get(url: str, **kwargs: typing.Any) -> requests.Response:
"""Make a Wikimedia API request and log one JSONL metric line."""
with WikimediaRequestTimer(wikimedia_log_config, "GET", url) as timer:
r = requests.get(url, **kwargs)
timer.log_response(r.status_code, r.url)
return r
def logged_post(url: str, **kwargs: typing.Any) -> requests.Response:
"""Make a Wikimedia API request and log one JSONL metric line."""
with WikimediaRequestTimer(wikimedia_log_config, "POST", url) as timer:
r = requests.post(url, **kwargs)
timer.log_response(r.status_code, r.url)
return r
def giveup(details: backoff.types.Details) -> None: def giveup(details: backoff.types.Details) -> None:
@ -77,8 +106,10 @@ def api_call(params: dict[str, str | int]) -> dict[str, typing.Any]:
"""Wikidata API call.""" """Wikidata API call."""
api_params: dict[str, str | int] = {"format": "json", "formatversion": 2, **params} api_params: dict[str, str | int] = {"format": "json", "formatversion": 2, **params}
try: try:
r = requests.get( r = logged_get(
"https://www.wikidata.org/w/api.php", params=api_params, headers=headers wikidata_api_url,
params=api_params,
headers=headers,
) )
json_data = typing.cast(dict[str, typing.Any], r.json()) json_data = typing.cast(dict[str, typing.Any], r.json())
except JSONDecodeError: except JSONDecodeError:
@ -144,7 +175,7 @@ Hit = dict[str, str | int | None]
@backoff.on_exception(backoff.expo, (QueryError, RequestException), max_tries=5) @backoff.on_exception(backoff.expo, (QueryError, RequestException), max_tries=5)
def wdqs(query: str) -> list[Row]: def wdqs(query: str) -> list[Row]:
"""Pass query to the Wikidata Query Service.""" """Pass query to the Wikidata Query Service."""
r = requests.post( r = logged_post(
wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers
) )

View file

@ -0,0 +1,169 @@
"""JSONL logging helpers for Wikimedia API request metrics."""
import json
import logging
import os
import socket
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from types import TracebackType
from urllib.parse import parse_qs, urlparse
@dataclass(frozen=True)
class WikimediaApiLogConfig:
"""Configuration for Wikimedia API request logging."""
tool: str
log_path: Path
user_agent: str
@dataclass(frozen=True)
class WikimediaApiRequestMetric:
"""Details of one Wikimedia API request."""
tool: str
url: str
method: str
status_code: int | None
elapsed_ms: int
user_agent: str
error: str | None = None
_logger_cache: dict[Path, logging.Logger] = {}
def setup_wikimedia_api_logger(log_path: Path) -> logging.Logger:
"""Create a JSONL logger for Wikimedia API request metrics."""
if log_path in _logger_cache:
return _logger_cache[log_path]
logger_name = f"wikimedia_api_metrics.{log_path}"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
logger.propagate = False
if not logger.handlers:
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
handler: logging.Handler = logging.FileHandler(log_path)
except OSError:
handler = logging.NullHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
_logger_cache[log_path] = logger
return logger
def get_mediawiki_action(url: str) -> str | None:
"""Extract the MediaWiki API action from a URL, if present."""
parsed = urlparse(url)
query = parse_qs(parsed.query)
values = query.get("action")
if not values:
return None
return values[0]
def build_log_record(metric: WikimediaApiRequestMetric) -> dict[str, object]:
"""Build a JSON-serialisable log record for one API request."""
parsed = urlparse(metric.url)
record: dict[str, object] = {
"ts": datetime.now(UTC).isoformat(),
"tool": metric.tool,
"host": socket.gethostname(),
"pid": os.getpid(),
"method": metric.method,
"api_host": parsed.netloc,
"path": parsed.path,
"action": get_mediawiki_action(metric.url),
"status_code": metric.status_code,
"elapsed_ms": metric.elapsed_ms,
"user_agent": metric.user_agent,
}
if metric.error is not None:
record["error"] = metric.error
return record
def log_wikimedia_api_request(
logger: logging.Logger,
metric: WikimediaApiRequestMetric,
) -> None:
"""Write one Wikimedia API request metric as a JSONL log line."""
record = build_log_record(metric)
logger.info(json.dumps(record, separators=(",", ":"), sort_keys=True))
class WikimediaRequestTimer:
"""Context manager for timing and logging a Wikimedia API request."""
def __init__(
self,
config: WikimediaApiLogConfig,
method: str,
url: str,
) -> None:
self.config = config
self.method = method
self.url = url
self.started = 0.0
self.logger = setup_wikimedia_api_logger(config.log_path)
def __enter__(self) -> "WikimediaRequestTimer":
"""Start timing a request."""
self.started = time.monotonic()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool:
"""Log failed requests when an exception escapes."""
if exc is None:
return False
elapsed_ms = int((time.monotonic() - self.started) * 1000)
log_wikimedia_api_request(
self.logger,
WikimediaApiRequestMetric(
tool=self.config.tool,
url=self.url,
method=self.method,
status_code=None,
elapsed_ms=elapsed_ms,
user_agent=self.config.user_agent,
error=type(exc).__name__,
),
)
return False
def log_response(self, status_code: int, final_url: str | None = None) -> None:
"""Log a completed request."""
elapsed_ms = int((time.monotonic() - self.started) * 1000)
log_wikimedia_api_request(
self.logger,
WikimediaApiRequestMetric(
tool=self.config.tool,
url=final_url or self.url,
method=self.method,
status_code=status_code,
elapsed_ms=elapsed_ms,
user_agent=self.config.user_agent,
),
)

View file

@ -1,7 +1,12 @@
import json
from pathlib import Path
import pytest import pytest
import pytest_mock import pytest_mock
import requests import requests
import responses import responses
from geocode import headers
from geocode.wikimedia_api_logging import WikimediaApiLogConfig
from geocode.wikidata import ( from geocode.wikidata import (
APIResponseError, APIResponseError,
QueryError, QueryError,
@ -92,6 +97,39 @@ def test_mediawiki_error_message_falls_back_to_response_text() -> None:
assert mediawiki_error_message(response) == "Please slow down" assert mediawiki_error_message(response) == "Please slow down"
@responses.activate
def test_api_call_logs_wikimedia_request(
mocker: pytest_mock.plugin.MockerFixture, tmp_path: Path
) -> None:
"""Test Wikimedia API requests are logged as JSONL metrics."""
log_path = tmp_path / "wikimedia-api.jsonl"
mocker.patch(
"geocode.wikidata.wikimedia_log_config",
WikimediaApiLogConfig(
tool="geocode",
log_path=log_path,
user_agent=headers["User-Agent"],
),
)
responses.add(
responses.GET,
"https://www.wikidata.org/w/api.php",
json={"entities": {"Q42": {"id": "Q42"}}},
status=200,
)
api_call({"action": "wbgetentities", "ids": "Q42"})
record = json.loads(log_path.read_text().strip())
assert record["tool"] == "geocode"
assert record["method"] == "GET"
assert record["api_host"] == "www.wikidata.org"
assert record["path"] == "/w/api.php"
assert record["action"] == "wbgetentities"
assert record["status_code"] == 200
assert record["user_agent"] == headers["User-Agent"]
def test_wdqs_retry(mocker: pytest_mock.plugin.MockerFixture) -> None: def test_wdqs_retry(mocker: pytest_mock.plugin.MockerFixture) -> None:
"""Test retry for WDQS API calls.""" """Test retry for WDQS API calls."""
# Patch 'time.sleep' to instantly return, effectively skipping the sleep # Patch 'time.sleep' to instantly return, effectively skipping the sleep