depicts/depicts/mediawiki.py
2023-11-09 07:39:16 +01:00

211 lines
5.7 KiB
Python

"""Access MediaWiki API."""
import hashlib
import json
import os
import typing
import requests
from . import utils
from .type import CallParams, Entity
wikidata_url = "https://www.wikidata.org/w/api.php"
page_size = 50
hosts = {
"commons": "commons.wikimedia.org",
"enwiki": "en.wikipedia.org",
"wikidata": "www.wikidata.org",
}
def api_call(params: CallParams, api_url: str = wikidata_url) -> requests.Response:
"""Mediawiki API call."""
call_params: CallParams = {
"format": "json",
"formatversion": 2,
**params,
}
r = requests.get(api_url, params=call_params, timeout=5)
return r
def api_post(params: CallParams, api_url: str = wikidata_url) -> requests.Response:
call_params: CallParams = {
"format": "json",
"formatversion": 2,
**params,
}
r = requests.post(api_url, data=call_params, timeout=5)
return r
def get_list(list_name: str, **params: str | int) -> list[dict[str, typing.Any]]:
r = api_call({"action": "query", "list": list_name, **params})
list_contents: list[dict[str, typing.Any]] = r.json()["query"][list_name]
return list_contents
def get_entity(qid: str, redirects: bool = False) -> Entity | None:
"""Get entity from wikibase."""
json_data = api_call(
{
"action": "wbgetentities",
"ids": qid,
"redirects": {True: "yes", False: "no"}[redirects],
}
).json()
try:
entity = list(json_data["entities"].values())[0]
except KeyError:
return None
if "missing" not in entity:
return typing.cast(Entity, entity)
return None
def wbgetentities(ids: typing.Iterable[str], **params: str | int) -> dict[str, Entity]:
"""Get entities from wikibase."""
if not ids:
return {}
params = {
"action": "wbgetentities",
"ids": "|".join(ids),
**params,
}
ret: dict[str, Entity] = api_call(params).json()["entities"]
return ret
def get_entities(ids: typing.Iterable[str], **params: str | int) -> list[Entity]:
entity_list: list[Entity] = []
for cur in utils.chunk(ids, page_size):
entity_list += wbgetentities(cur, **params).values()
return entity_list
def get_entities_dict(ids: str, **params: str | int) -> dict[str, Entity]:
entities = {}
for cur in utils.chunk(ids, page_size):
entities.update(wbgetentities(cur, **params))
return entities
def get_entity_with_cache(qid: str, refresh: bool = False) -> Entity | None:
filename = f"cache/{qid}.json"
entity: Entity | None
if not refresh and os.path.exists(filename):
entity = json.load(open(filename))
else:
entity = get_entity(qid, redirects=True)
json.dump(entity, open(filename, "w"), indent=2)
return entity
def get_entities_with_cache(ids: list[str], **params: typing.Any) -> list[Entity]:
md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest()
entity_list: list[Entity]
filename = f"cache/entities_{md5}.json"
if os.path.exists(filename):
entity_list = json.load(open(filename))
else:
entity_list = get_entities(ids, **params)
json.dump(entity_list, open(filename, "w"), indent=2)
return entity_list
def get_entities_dict_with_cache(
all_ids: list[str], **params: typing.Any
) -> dict[str, Entity]:
entities = {}
for ids in utils.chunk(all_ids, page_size):
md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest()
filename = f"cache/entities_dict_{md5}.json"
if os.path.exists(filename):
entities.update(json.load(open(filename)))
continue
cur = wbgetentities(ids, **params)
json.dump(cur, open(filename, "w"), indent=2)
entities.update(cur)
return entities
Page = dict[str, typing.Any]
def mediawiki_query(titles: list[str], params: CallParams, site: str) -> list[Page]:
"""Mediawiki query."""
if not titles:
return []
# avoid error: Too many values supplied for parameter "titles". The limit is 50.
# FIXME: switch to utils.chunk
if len(titles) > page_size:
titles = titles[:page_size]
base: CallParams = {
"format": "json",
"formatversion": 2,
"action": "query",
"continue": "",
"titles": "|".join(titles),
}
p = base.copy()
p.update(params)
query_url = f"https://{hosts[site]}/w/api.php"
r = requests.get(query_url, params=p)
expect = "application/json; charset=utf-8"
success = True
if r.status_code != 200:
print("status code: {r.status_code}".format(r=r))
success = False
if r.headers["content-type"] != expect:
print(f'content-type: {r.headers["content-type"]}')
success = False
assert success
json_reply = r.json()
if "query" not in json_reply:
print(r.url)
print(r.text)
pages: list[Page] = json_reply["query"]["pages"]
return pages
def get_content_and_categories(title: str, site: str) -> tuple[str, list[str]]:
"""Get article contents and categories."""
params: CallParams = {
"prop": "revisions|categories",
"clshow": "!hidden",
"cllimit": "max",
"rvprop": "content",
}
pages = mediawiki_query([title], params, site)
assert len(pages) == 1
page = pages[0]
return (page["revisions"][0]["content"], page.get("categories", []))
def host_from_site(site: str) -> str:
"""Host from site."""
return hosts[site]
def get_history(title: str, site: str) -> list[Page]:
"""Get history of a page."""
params: CallParams = {
"prop": "revisions",
"rvlimit": "max",
"rvprop": "timestamp|user|comment|ids|content",
"rvslots": "main",
}
return mediawiki_query([title], params, site)