import abc import json import re import typing from collections import defaultdict from typing import Any import sqlalchemy from flask_login import UserMixin from geoalchemy2 import Geometry from sqlalchemy import func from sqlalchemy.dialects import postgresql from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import ( Mapped, QueryPropertyDescriptor, backref, column_property, deferred, mapped_column, registry, relationship, ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.decl_api import DeclarativeMeta from sqlalchemy.schema import Column, ForeignKey from sqlalchemy.types import BigInteger, Boolean, DateTime, Float, Integer, String, Text from . import mail, utils, wikidata, wikidata_api from .database import now_utc, session mapper_registry = registry() def cast_to_string(v: Column[int]) -> sqlalchemy.sql.elements.Cast[str]: """Cast an value to a string.""" return sqlalchemy.sql.expression.cast(v, String) class Base(metaclass=DeclarativeMeta): """Database model base class.""" __abstract__ = True registry = mapper_registry metadata = mapper_registry.metadata query: QueryPropertyDescriptor = session.query_property() __init__ = mapper_registry.constructor re_point = re.compile(r"^POINT\((.+) (.+)\)$") osm_type_enum = postgresql.ENUM( "node", "way", "relation", name="osm_type_enum", metadata=Base.metadata ) re_lau_code = re.compile(r"^[A-Z]{2}([^A-Z].+)$") # 'LAU (local administrative unit)' property_map = [ ("P238", ["iata"], "IATA airport code"), ("P239", ["icao"], "ICAO airport code"), ("P240", ["faa", "ref"], "FAA airport code"), ("P296", ["ref", "ref:train", "railway:ref"], "station code"), ("P300", ["ISO3166-2"], "ISO 3166-2 code"), ("P359", ["ref:rce"], "Rijksmonument ID"), ("P590", ["ref:gnis", "GNISID", "gnis:id", "gnis:feature_id"], "USGS GNIS ID"), ("P649", ["ref:nrhp"], "NRHP reference number"), ("P722", ["uic_ref"], "UIC station code"), ("P782", ["ref"], "LAU (local administrative unit)"), ("P836", ["ref:gss"], "UK Government Statistical Service code"), ("P856", ["website", "contact:website", "url"], "website"), ("P882", ["nist:fips_code"], "FIPS 6-4 (US counties)"), ("P901", ["ref:fips"], "FIPS 10-4 (countries and regions)"), # A UIC id can be a IBNR, but not every IBNR is an UIC id ("P954", ["uic_ref"], "IBNR ID"), ("P981", ["ref:woonplaatscode"], "BAG code for Dutch residencies"), ("P1216", ["HE_ref"], "National Heritage List for England number"), ("P2253", ["ref:edubase"], "EDUBase URN"), ("P2815", ["esr:user", "ref", "ref:train"], "ESR station code"), ("P3425", ["ref", "ref:SIC"], "Natura 2000 site ID"), ("P3562", ["seamark:light:reference"], "Admiralty number"), ( "P4755", ["ref", "ref:train", "ref:crs", "crs", "nat_ref"], "UK railway station code", ), ("P4803", ["ref", "ref:train"], "Amtrak station code"), ("P6082", ["nycdoitt:bin"], "NYC Building Identification Number"), ("P5086", ["ref"], "FIPS 5-2 alpha code (US states)"), ("P5087", ["ref:fips"], "FIPS 5-2 numeric code (US states)"), ("P5208", ["ref:bag"], "BAG building ID for Dutch buildings"), ] T = typing.TypeVar("T", bound="Item") class Item(Base): """Wikidata item.""" __tablename__ = "item" item_id = Column(Integer, primary_key=True, autoincrement=False) labels = Column(postgresql.JSONB) descriptions = Column(postgresql.JSONB) aliases = Column(postgresql.JSONB) sitelinks = Column(postgresql.JSONB) claims = Column(postgresql.JSONB, nullable=False) lastrevid = Column(Integer, nullable=False, unique=True) locations: Mapped[list["ItemLocation"]] = relationship( "ItemLocation", cascade="all, delete-orphan", backref="item" ) qid: Mapped[str] = column_property("Q" + cast_to_string(item_id)) wiki_extracts: Mapped[list["Extract"]] = relationship( "Extract", collection_class=attribute_mapped_collection("site"), cascade="save-update, merge, delete, delete-orphan", backref="item", ) extracts = association_proxy("wiki_extracts", "extract") @classmethod def get_by_qid(cls: typing.Type[T], qid: str) -> T | None: """Lookup Item via QID.""" if qid and len(qid) > 1 and qid[0].upper() == "Q" and qid[1:].isdigit(): obj: T = cls.query.get(qid[1:]) return obj return None @property def wd_url(self) -> str: """Wikidata URL for item.""" return f"https://www.wikidata.org/wiki/{self.qid}" def get_claim(self, pid: str) -> list[str | int | dict[str, str | int] | None]: """List of claims for given Wikidata property ID.""" claims = typing.cast(wikidata.Claims, self.claims) return [ i["mainsnak"]["datavalue"]["value"] if "datavalue" in i["mainsnak"] else None for i in claims.get(pid, []) ] def label(self, lang: str = "en") -> str: """Label for this Wikidata item.""" labels = typing.cast(dict[str, dict[str, Any]], self.labels) if lang in labels: return typing.cast(str, labels[lang]["value"]) elif "en" in labels: return typing.cast(str, labels["en"]["value"]) label_list = list(labels.values()) return typing.cast(str, label_list[0]["value"]) if label_list else "[no label]" def description(self, lang: str = "en") -> str | None: """Return a description of the item.""" descriptions = typing.cast(dict[str, dict[str, Any]], self.descriptions) if lang in descriptions: return typing.cast(str, descriptions[lang]["value"]) elif "en" in descriptions: return typing.cast(str, descriptions["en"]["value"]) return None d_list = list(self.descriptions.values()) if d_list: return d_list[0]["value"] def get_aliases(self, lang: str = "en") -> list[str]: """Get aliases.""" if lang not in self.aliases: if "en" not in self.aliases: return [] lang = "en" return [a["value"] for a in self.aliases[lang]] def get_part_of_names(self) -> set[str]: if not self.claims: return set() part_of_names = set() for p361 in self.claims.get("P361", []): try: part_of_id = p361["mainsnak"]["datavalue"]["value"]["numeric-id"] except KeyError: continue if part_of_id == self.item_id: continue # avoid loop for 'part of' self-reference # TODO: download item if it doesn't exist part_of_item = Item.query.get(part_of_id) if part_of_item: names = part_of_item.names(check_part_of=False) if names: part_of_names |= names.keys() return part_of_names @property def entity(self) -> wikidata_api.EntityType: """Entity.""" keys = ["labels", "aliases", "descriptions", "sitelinks", "claims"] return typing.cast( wikidata_api.EntityType, {key: getattr(self, key) for key in keys} ) def names(self, check_part_of: bool = True) -> dict[str, list[str]] | None: part_of_names = self.get_part_of_names() if check_part_of else set() d = wikidata.names_from_entity(self.entity) or defaultdict(list) for name, sources in list(d.items()): if len(sources) == 1 and sources[0][0] == "image": continue for part_of_name in part_of_names: if not name.startswith(part_of_name): continue prefix_removed = name[len(part_of_name) :].strip() if prefix_removed not in d: d[prefix_removed] = sources if self.claims: for p6375 in self.claims.get("P6375", []): try: street_address = p6375["mainsnak"]["datavalue"]["value"] except KeyError: continue d[street_address["text"]].append( ("P6375", street_address.get("language")) ) # A terrace of buildings can be illustrated with a photo of a single building. # We try to determine if this is the case and avoid using the filename of the # single building photo as a name for matching. def has_digit(s): return any(c.isdigit() for c in s) image_names = { name for name, sources in d.items() if len(sources) == 1 and sources[0][0] == "image" and has_digit(name) } if not image_names: return dict(d) or None other_names = {n for n in d.keys() if n not in image_names and has_digit(n)} for image_name in image_names: for other in other_names: if not utils.is_in_range(other, image_name): continue del d[image_name] break return dict(d) or None def get_isa(self) -> list[dict[str, int | str]]: """Get item IDs of IsA items for this item.""" isa_list = [] of_property = "P642" claims = typing.cast(wikidata.Claims, self.claims) for claim in claims.get("P31", []): qualifiers = claim.get("qualifiers", {}) if "datavalue" in claim["mainsnak"]: isa_list.append(claim["mainsnak"]["datavalue"]["value"]) for of_qualifier in qualifiers.get(of_property, []): if "datavalue" in of_qualifier: isa_list.append(of_qualifier["datavalue"]["value"]) return isa_list def get_isa_qids(self) -> list[str]: """Get QIDs of items listed instance of (P31) property.""" return [typing.cast(str, isa["id"]) for isa in self.get_isa()] def is_street(self, isa_qids: typing.Collection[str] | None = None) -> bool: """Item represents a street.""" if isa_qids is None: isa_qids = self.get_isa_qids() matching_types = { "Q12731", # dead end street "Q34442", # road "Q79007", # street "Q83620", # thoroughfare "Q21000333", # shopping street "Q62685721", # pedestrian street } return bool(matching_types & set(isa_qids)) def is_watercourse(self, isa_qids: typing.Collection[str] | None = None) -> bool: """Item represents a watercourse.""" if isa_qids is None: isa_qids = self.get_isa_qids() matching_types = { "Q355304", # watercourse "Q4022", # river "Q47521", # stream "Q1437299", # creek "Q63565252", # brook "Q12284", # canal "Q55659167", # natural watercourse } return bool(matching_types & set(isa_qids)) def is_linear_feature(self): isa_qids = set(self.get_isa_qids()) return self.is_street(isa_qids) or self.is_watercourse(isa_qids) def is_tram_stop(self) -> bool: """Item is a tram stop.""" return "Q2175765" in self.get_isa_qids() def alert_admin_about_bad_time(self, v: utils.WikibaseTime) -> None: """Send an email to admin when encountering an unparsable time in Wikibase.""" body = ( "Wikidata item has an unsupported time precision\n\n" + self.wd_url + "\n\n" + "Value:\n\n" + json.dumps(v, indent=2) ) mail.send_mail(f"OWL Map: bad time value in {self.qid}", body) def time_claim(self, pid: str) -> list[str]: """Read values from time statement.""" ret = [] for v in typing.cast(list[utils.WikibaseTime | None], self.get_claim(pid)): if not v: continue try: t = utils.format_wikibase_time(v) except Exception: self.alert_admin_about_bad_time(v) raise if t: ret.append(t) else: self.alert_admin_about_bad_time(v) return ret def closed(self) -> list[str]: """Date when item closed.""" return self.time_claim("P3999") def first_paragraph_language(self, lang: str) -> str | None: """First paragraph of Wikipedia article in the given languages.""" if lang not in self.sitelinks(): return None extract = self.extracts.get(lang) if not extract: return None assert isinstance(extract, str) empty_list = [ "

", "

\n

", "

\n\n

", "

\n\n

", "

\n\n\n

", "

.\n

", "


", '

\n

', '

\n\n

', '

\n\n\n

', ] text = extract.strip() while True: found_empty = False for empty in empty_list: if text.startswith(empty): text = text[len(empty) :].strip() found_empty = True if not found_empty: break close_tag = "

" first_end_p_tag = text.find(close_tag) if first_end_p_tag == -1: # FIXME: e-mail admin return text return text[: first_end_p_tag + len(close_tag)] def get_identifiers_tags(self) -> dict[str, list[tuple[list[str], str]]]: tags = defaultdict(list) for claim, osm_keys, label in property_map: values = [ i["mainsnak"]["datavalue"]["value"] for i in self.claims.get(claim, []) if "datavalue" in i["mainsnak"] ] if not values: continue if claim == "P782": values += [ m.group(1) for m in (re_lau_code.match(v) for v in values) if m ] for osm_key in osm_keys: tags[osm_key].append((values, label)) return dict(tags) def get_identifiers(self) -> dict[str, list[str]]: ret = {} for claim, osm_keys, label in property_map: values = [ i["mainsnak"]["datavalue"]["value"] for i in self.claims.get(claim, []) if "datavalue" in i["mainsnak"] ] if not values: continue if claim == "P782": values += [ m.group(1) for m in (re_lau_code.match(v) for v in values) if m ] for osm_key in osm_keys: ret[label] = values return ret # class Claim(Base): # __tablename__ = "claim" # item_id = Column(Integer, primary_key=True) # property_id = Column(Integer, primary_key=True) # position = Column(Integer, primary_key=True) # mainsnak = Column(postgresql.JSONB) class ItemIsA(Base): """Item IsA.""" __tablename__ = "item_isa" item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) isa_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) item: Mapped[Item] = relationship("Item", foreign_keys=[item_id]) isa: Mapped[Item] = relationship("Item", foreign_keys=[isa_id]) class ItemLocation(Base): """Location of an item.""" __tablename__ = "item_location" item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) property_id = Column(Integer, primary_key=True) statement_order = Column(Integer, primary_key=True) location = Column(Geometry("POINT", srid=4326, spatial_index=True), nullable=False) qid = column_property("Q" + cast_to_string(item_id)) pid = column_property("P" + cast_to_string(property_id)) def get_lat_lon(self) -> tuple[float, float]: """Get latitude and longitude of item.""" loc: tuple[float, float] loc = session.query(func.ST_Y(self.location), func.ST_X(self.location)).one() return loc def location_objects( coords_dict: dict[str, list[wikidata.Coords]] ) -> list[ItemLocation]: """Create location objects with the given coordinates.""" locations: list[ItemLocation] = [] for pid, coord_list in coords_dict.items(): for num, coords in enumerate(coord_list): point = f"POINT({coords['longitude']} {coords['latitude']})" loc: ItemLocation = ItemLocation( property_id=int(pid[1:]), statement_order=num, location=point ) locations.append(loc) return locations class MapMixin: """Map base class.""" @declared_attr def __tablename__(cls): return "planet_osm_" + cls.__name__.lower() src_id = Column("osm_id", Integer, primary_key=True, autoincrement=False) name = Column(String) admin_level = Column(String) boundary = Column(String) tags: Mapped[postgresql.hstore] = mapped_column(postgresql.HSTORE) @declared_attr def way(cls): return deferred( Column(Geometry("GEOMETRY", srid=4326, spatial_index=True), nullable=False) ) @declared_attr def kml(cls) -> Mapped[str]: """Get object in KML format.""" return column_property(func.ST_AsKML(cls.way), deferred=True) @declared_attr def geojson_str(cls) -> Mapped[str]: """Get object as GeoJSON string.""" return column_property( func.ST_AsGeoJSON(cls.way, maxdecimaldigits=6), deferred=True ) @declared_attr def as_EWKT(cls) -> Mapped[str]: """As EWKT.""" return column_property(func.ST_AsEWKT(cls.way), deferred=True) @hybrid_property def has_street_address(self) -> bool: """Object has street address.""" return "addr:housenumber" in self.tags and "addr:street" in self.tags def display_name(self) -> str: """Name for display.""" for key in "bridge:name", "tunnel:name", "lock_name": if key in self.tags: return typing.cast(str, self.tags[key]) return typing.cast( str, self.name or self.tags.get("addr:housename") or self.tags.get("inscription"), ) def geojson(self) -> dict[str, typing.Any]: """Object GeoJSON parsed into Python data structure.""" return typing.cast(dict[str, typing.Any], json.loads(self.geojson_str)) def get_centroid(self) -> tuple[float, float]: """Centroid.""" centroid = session.query(func.ST_AsText(func.ST_Centroid(self.way))).scalar() assert centroid assert (m := re_point.match(centroid)) lon, lat = m.groups() return (float(lat), float(lon)) @classmethod def coords_within(cls, lat: float, lon: float): point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), 4326) return cls.query.filter( cls.admin_level.isnot(None), func.ST_Within(point, cls.way) ).order_by(cls.area) @property def id(self) -> int: """OSM id.""" return abs(self.src_id) # relations have negative IDs @property @abc.abstractmethod def type(self) -> str: """OSM type.""" @property def identifier(self) -> str: """OSM identifier.""" return f"{self.type}/{self.id}" @property def osm_url(self): """OSM URL.""" return f"https://www.openstreetmap.org/{self.type}/{self.id}" class Point(MapMixin, Base): """OSM planet point.""" type = "node" class Line(MapMixin, Base): """OSM planet line.""" @property def type(self) -> str: """OSM type.""" return "way" if self.src_id > 0 else "relation" @classmethod def get_osm(cls, osm_type, osm_id): src_id = osm_id * {"way": 1, "relation": -1}[osm_type] return cls.query.get(src_id) class Polygon(MapMixin, Base): """OSM planet polygon.""" way_area = Column(Float) @classmethod def get_osm(cls, osm_type, osm_id): src_id = osm_id * {"way": 1, "relation": -1}[osm_type] return cls.query.get(src_id) @property def type(self) -> str: """Polygon is either a way or a relation.""" return "way" if self.src_id > 0 else "relation" @declared_attr def area(cls) -> sqlalchemy.orm.properties.ColumnProperty: """Polygon area.""" return column_property(func.ST_Area(cls.way, False), deferred=True) @hybrid_property def area_in_sq_km(self) -> float: """Size of area in square km.""" area: float = self.area return area / (1000 * 1000) class User(Base, UserMixin): """User.""" __tablename__ = "user" id = Column(Integer, primary_key=True) username = Column(String) password = Column(String) name = Column(String) email = Column(String) active = Column(Boolean, default=True) sign_up = Column(DateTime, default=now_utc()) is_admin = Column(Boolean, default=False) description = Column(Text) img = Column(String) # OSM avatar languages = Column(postgresql.ARRAY(String)) single = Column(String) multi = Column(String) units = Column(String) wikipedia_tag = Column(Boolean, default=False) mock_upload = Column(Boolean, default=False) osm_id = Column(Integer, index=True) osm_account_created = Column(DateTime) osm_oauth_token = Column(String) osm_oauth_token_secret = Column(String) def is_active(self) -> bool: """User is active.""" return self.active class EditSession(Base): """Edit session.""" __tablename__ = "edit_session" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey(User.id)) created = Column(DateTime, default=now_utc(), nullable=False) edit_list = Column(postgresql.JSONB) comment = Column(String) user: Mapped[User] = relationship("User") changeset: Mapped["Changeset"] = relationship( "Changeset", back_populates="edit_session", uselist=False ) class Changeset(Base): """An OSM Changeset generated by this tool.""" __tablename__ = "changeset" id = Column(BigInteger, primary_key=True) created = Column(DateTime) comment = Column(String) user_id = Column(Integer, ForeignKey(User.id)) update_count = Column(Integer, nullable=False) edit_session_id = Column(Integer, ForeignKey(EditSession.id)) user: Mapped[User] = relationship( "User", backref=backref( "changesets", lazy="dynamic", order_by="Changeset.created.desc()" ), ) edit_session: Mapped[EditSession] = relationship( "EditSession", back_populates="changeset" ) class ChangesetEdit(Base): """Record details of edits within a changeset.""" __tablename__ = "changeset_edit" changeset_id = Column(BigInteger, ForeignKey("changeset.id"), primary_key=True) item_id = Column(Integer, primary_key=True) osm_id = Column(BigInteger, primary_key=True) osm_type = Column(osm_type_enum, primary_key=True) saved = Column(DateTime, default=now_utc(), nullable=False) changeset: Mapped[Changeset] = relationship( "Changeset", backref=backref("edits", lazy="dynamic") ) class SkipIsA(Base): """Ignore this item type when walking the Wikidata subclass graph.""" __tablename__ = "skip_isa" item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) qid = column_property("Q" + cast_to_string(item_id)) item: Mapped[Item] = relationship("Item") class ItemExtraKeys(Base): """Extra tag or key to consider for an Wikidata item type.""" __tablename__ = "item_extra_keys" item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) tag_or_key = Column(String, primary_key=True) note = Column(String) qid = column_property("Q" + cast_to_string(item_id)) item: Mapped[Item] = relationship("Item") class Extract(Base): """First paragraph from Wikipedia.""" __tablename__ = "extract" item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) site = Column(String, primary_key=True) extract = Column(String, nullable=False) def __init__(self, site: str, extract: str): """Initialise the object.""" self.site = site self.extract = extract