forked from edward/owl-map
Add types and docstrings
This commit is contained in:
parent
3f7df22083
commit
980737753f
|
@ -1,52 +1,55 @@
|
||||||
from collections import defaultdict
|
"""Process Wikidata items to extract coordinates and names."""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
import typing
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
from . import wikidata_api
|
||||||
|
|
||||||
hq_pid = "P159"
|
hq_pid = "P159"
|
||||||
coords_pid = "P625"
|
coords_pid = "P625"
|
||||||
|
|
||||||
|
Claims = dict[str, list[dict[str, Any]]]
|
||||||
|
|
||||||
def read_coords(snak):
|
|
||||||
|
class Coords(typing.TypedDict):
|
||||||
|
"""Coordinates."""
|
||||||
|
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
def read_coords(snak: dict[str, Any]) -> Coords | None:
|
||||||
|
"""Read coordinates from snak."""
|
||||||
try:
|
try:
|
||||||
v = snak["datavalue"]["value"]
|
v = snak["datavalue"]["value"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return
|
return None
|
||||||
if v["globe"].rpartition("/")[2] != "Q2":
|
if v["globe"].rpartition("/")[2] != "Q2":
|
||||||
return
|
return None
|
||||||
|
|
||||||
return {k: v[k] for k in ("latitude", "longitude")}
|
return cast(Coords, {k: v[k] for k in ("latitude", "longitude")})
|
||||||
|
|
||||||
|
|
||||||
def read_hq_coords(claims):
|
def read_hq_coords(claims: Claims) -> list[Coords]:
|
||||||
if hq_pid not in claims:
|
"""Coordinates of item headquarters."""
|
||||||
return []
|
found: list[Coords] = []
|
||||||
|
for hq_claim in claims.get(hq_pid, []):
|
||||||
found = []
|
for snak in hq_claim.get("qualifiers", {}).get(coords_pid, []):
|
||||||
for hq_claim in claims[hq_pid]:
|
if coords := read_coords(snak):
|
||||||
if "qualifiers" not in hq_claim:
|
|
||||||
continue
|
|
||||||
if coords_pid not in hq_claim["qualifiers"]:
|
|
||||||
continue
|
|
||||||
for snak in hq_claim["qualifiers"][coords_pid]:
|
|
||||||
coords = read_coords(snak)
|
|
||||||
if coords:
|
|
||||||
found.append(coords)
|
found.append(coords)
|
||||||
|
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
|
||||||
def read_location_statement(claims, pid):
|
def read_location_statement(claims: Claims, pid: str) -> list[Coords]:
|
||||||
if pid not in claims:
|
"""Get coordinates from given claim."""
|
||||||
return []
|
return [i for i in (read_coords(c["mainsnak"]) for c in claims.get(pid, [])) if i]
|
||||||
|
|
||||||
found = []
|
|
||||||
for statement in claims[pid]:
|
|
||||||
coords = read_coords(statement["mainsnak"])
|
|
||||||
if coords:
|
|
||||||
found.append(coords)
|
|
||||||
return found
|
|
||||||
|
|
||||||
|
|
||||||
def get_entity_coords(claims):
|
def get_entity_coords(claims: Claims) -> dict[str, Any]:
|
||||||
|
"""Read entity coordinate locations from claims dict."""
|
||||||
assert "claims" not in claims # make sure we weren't passed entity by mistake
|
assert "claims" not in claims # make sure we weren't passed entity by mistake
|
||||||
ret = {
|
ret = {
|
||||||
coords_pid: read_location_statement(claims, coords_pid),
|
coords_pid: read_location_statement(claims, coords_pid),
|
||||||
|
@ -54,22 +57,26 @@ def get_entity_coords(claims):
|
||||||
}
|
}
|
||||||
return {pid: values for pid, values in ret.items() if values}
|
return {pid: values for pid, values in ret.items() if values}
|
||||||
|
|
||||||
def names_from_entity(entity, skip_lang=None):
|
|
||||||
|
def names_from_entity(
|
||||||
|
entity: wikidata_api.EntityType, skip_lang: set[str] | None = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Find all sources of names from Item."""
|
||||||
if skip_lang is None:
|
if skip_lang is None:
|
||||||
skip_lang = set()
|
skip_lang = set()
|
||||||
|
|
||||||
ret = defaultdict(list)
|
ret = defaultdict(list)
|
||||||
cat_start = 'Category:'
|
cat_start = "Category:"
|
||||||
|
|
||||||
for k, v in entity['labels'].items():
|
for k, v in entity["labels"].items():
|
||||||
if k in skip_lang:
|
if k in skip_lang:
|
||||||
continue
|
continue
|
||||||
ret[v['value']].append(('label', k))
|
ret[v["value"]].append(("label", k))
|
||||||
|
|
||||||
for k, v in entity['sitelinks'].items():
|
for k, v in entity["sitelinks"].items():
|
||||||
if k + 'wiki' in skip_lang:
|
if k + "wiki" in skip_lang:
|
||||||
continue
|
continue
|
||||||
title = v['title']
|
title = v["title"]
|
||||||
if title.startswith(cat_start):
|
if title.startswith(cat_start):
|
||||||
title = title[len(cat_start) :]
|
title = title[len(cat_start) :]
|
||||||
|
|
||||||
|
@ -79,48 +86,48 @@ def names_from_entity(entity, skip_lang=None):
|
||||||
if lc_first_title in ret:
|
if lc_first_title in ret:
|
||||||
title = lc_first_title
|
title = lc_first_title
|
||||||
|
|
||||||
ret[title].append(('sitelink', k))
|
ret[title].append(("sitelink", k))
|
||||||
|
|
||||||
for lang, value_list in entity.get('aliases', {}).items():
|
for lang, value_list in entity.get("aliases", {}).items():
|
||||||
if lang in skip_lang or len(value_list) > 3:
|
if lang in skip_lang or len(value_list) > 3:
|
||||||
continue
|
continue
|
||||||
for name in value_list:
|
for name in value_list:
|
||||||
ret[name['value']].append(('alias', lang))
|
ret[name["value"]].append(("alias", lang))
|
||||||
|
|
||||||
commonscats = entity.get('claims', {}).get('P373', [])
|
commonscats = entity.get("claims", {}).get("P373", [])
|
||||||
for i in commonscats:
|
for i in commonscats:
|
||||||
if 'datavalue' not in i['mainsnak']:
|
if "datavalue" not in i["mainsnak"]:
|
||||||
continue
|
continue
|
||||||
value = i['mainsnak']['datavalue']['value']
|
value = i["mainsnak"]["datavalue"]["value"]
|
||||||
ret[value].append(('commonscat', None))
|
ret[value].append(("commonscat", None))
|
||||||
|
|
||||||
officialname = entity.get('claims', {}).get('P1448', [])
|
officialname = entity.get("claims", {}).get("P1448", [])
|
||||||
for i in officialname:
|
for i in officialname:
|
||||||
if 'datavalue' not in i['mainsnak']:
|
if "datavalue" not in i["mainsnak"]:
|
||||||
continue
|
continue
|
||||||
value = i['mainsnak']['datavalue']['value']
|
value = i["mainsnak"]["datavalue"]["value"]
|
||||||
ret[value['text']].append(('officialname', value['language']))
|
ret[value["text"]].append(("officialname", value["language"]))
|
||||||
|
|
||||||
nativelabel = entity.get('claims', {}).get('P1705', [])
|
nativelabel = entity.get("claims", {}).get("P1705", [])
|
||||||
for i in nativelabel:
|
for i in nativelabel:
|
||||||
if 'datavalue' not in i['mainsnak']:
|
if "datavalue" not in i["mainsnak"]:
|
||||||
continue
|
continue
|
||||||
value = i['mainsnak']['datavalue']['value']
|
value = i["mainsnak"]["datavalue"]["value"]
|
||||||
ret[value['text']].append(('nativelabel', value['language']))
|
ret[value["text"]].append(("nativelabel", value["language"]))
|
||||||
|
|
||||||
image = entity.get('claims', {}).get('P18', [])
|
image = entity.get("claims", {}).get("P18", [])
|
||||||
for i in image:
|
for i in image:
|
||||||
if 'datavalue' not in i['mainsnak']:
|
if "datavalue" not in i["mainsnak"]:
|
||||||
continue
|
continue
|
||||||
value = i['mainsnak']['datavalue']['value']
|
value = i["mainsnak"]["datavalue"]["value"]
|
||||||
m = re.search(r'\.[a-z]{3,4}$', value)
|
m = re.search(r"\.[a-z]{3,4}$", value)
|
||||||
if m:
|
if m:
|
||||||
value = value[: m.start()]
|
value = value[: m.start()]
|
||||||
for pattern in r' - geograph\.org\.uk - \d+$', r'[, -]*0\d{2,}$':
|
for pattern in r" - geograph\.org\.uk - \d+$", r"[, -]*0\d{2,}$":
|
||||||
m = re.search(pattern, value)
|
m = re.search(pattern, value)
|
||||||
if m:
|
if m:
|
||||||
value = value[: m.start()]
|
value = value[: m.start()]
|
||||||
break
|
break
|
||||||
ret[value].append(('image', None))
|
ret[value].append(("image", None))
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
Loading…
Reference in a new issue