forked from edward/owl-map
Merge branch 'main' of github.com:EdwardBetts/owl-map
This commit is contained in:
commit
33bda7f54b
12 changed files with 1335 additions and 281 deletions
|
|
@ -4,7 +4,9 @@
|
|||
CallParams = dict[str, str | int]
|
||||
|
||||
user_agent = (
|
||||
"osm-wikidata/0.1 (https://github.com/EdwardBetts/osm-wikidata; edward@4angle.com)"
|
||||
"osm-wikidata/0.2"
|
||||
+ " (https://github.com/EdwardBetts/osm-wikidata;"
|
||||
+ " edward@4angle.com)"
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -40,13 +40,14 @@ skip_tags = {
|
|||
}
|
||||
|
||||
|
||||
def get_country_iso3166_1(lat: float, lon: float) -> set[str]:
|
||||
def get_country_iso3166_1(lat, lon):
|
||||
"""For a given lat/lon return a set of ISO country codes.
|
||||
|
||||
Also cache the country code in the global object.
|
||||
|
||||
Normally there should be only one country.
|
||||
"""
|
||||
|
||||
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid)
|
||||
alpha2_codes = set()
|
||||
q = model.Polygon.query.filter(
|
||||
|
|
@ -263,7 +264,7 @@ WHERE tags ? 'wikidata'
|
|||
conn = database.session.connection()
|
||||
result = conn.execute(text(sql))
|
||||
|
||||
print(sql)
|
||||
# print(sql)
|
||||
|
||||
point_sql = (
|
||||
f"""
|
||||
|
|
@ -788,7 +789,7 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
|
|||
if limit:
|
||||
s = s.limit(limit)
|
||||
|
||||
print(s.compile(compile_kwargs={"literal_binds": True}))
|
||||
# print(s.compile(compile_kwargs={"literal_binds": True}))
|
||||
|
||||
conn = database.session.connection()
|
||||
nearby = []
|
||||
|
|
@ -1031,7 +1032,7 @@ def isa_incremental_search(search_terms):
|
|||
func.length(en_label) < 20,
|
||||
)
|
||||
|
||||
print(q.statement.compile(compile_kwargs={"literal_binds": True}))
|
||||
# print(q.statement.compile(compile_kwargs={"literal_binds": True}))
|
||||
|
||||
ret = []
|
||||
for item in q:
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
from flask import current_app, g, request, has_request_context
|
||||
import smtplib
|
||||
import sys
|
||||
import traceback
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formatdate, make_msgid
|
||||
from pprint import pformat
|
||||
import smtplib
|
||||
import traceback
|
||||
import sys
|
||||
|
||||
from flask import current_app, g, has_request_context, request
|
||||
|
||||
|
||||
def send_mail(subject, body, config=None):
|
||||
|
|
@ -71,7 +72,7 @@ def open_changeset_error(session_id, changeset, r):
|
|||
username = g.user.username
|
||||
body = f"""
|
||||
user: {username}
|
||||
page: {url}
|
||||
page: {r.url}
|
||||
|
||||
message user: https://www.openstreetmap.org/message/new/{username}
|
||||
|
||||
|
|
|
|||
430
matcher/model.py
430
matcher/model.py
|
|
@ -1,89 +1,159 @@
|
|||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.schema import ForeignKey, Column
|
||||
from sqlalchemy.orm import relationship, column_property, deferred, backref
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.types import Integer, String, Float, Boolean, DateTime, Text, BigInteger
|
||||
from sqlalchemy.dialects import postgresql
|
||||
from sqlalchemy.sql.expression import cast
|
||||
from sqlalchemy.ext.hybrid import hybrid_property
|
||||
from sqlalchemy.ext.declarative import declared_attr
|
||||
from geoalchemy2 import Geometry
|
||||
from collections import defaultdict
|
||||
from flask_login import UserMixin
|
||||
from .database import session, now_utc
|
||||
from . import wikidata, utils, mail
|
||||
import json
|
||||
import re
|
||||
import typing
|
||||
from collections import defaultdict
|
||||
from typing import Any
|
||||
|
||||
from flask_login import UserMixin
|
||||
from geoalchemy2 import Geometry
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.dialects import postgresql
|
||||
from sqlalchemy.ext.associationproxy import association_proxy
|
||||
from sqlalchemy.ext.declarative import declarative_base, declared_attr
|
||||
from sqlalchemy.ext.hybrid import hybrid_property
|
||||
from sqlalchemy.orm import backref, column_property, deferred, relationship
|
||||
from sqlalchemy.orm.collections import attribute_mapped_collection
|
||||
from sqlalchemy.schema import Column, ForeignKey
|
||||
from sqlalchemy.sql.expression import cast
|
||||
from sqlalchemy.types import BigInteger, Boolean, DateTime, Float, Integer, String, Text
|
||||
|
||||
from . import mail, utils, wikidata
|
||||
from .database import now_utc, session
|
||||
|
||||
Base = declarative_base()
|
||||
Base.query = session.query_property()
|
||||
|
||||
re_point = re.compile(r'^POINT\((.+) (.+)\)$')
|
||||
re_point = re.compile(r"^POINT\((.+) (.+)\)$")
|
||||
|
||||
osm_type_enum = postgresql.ENUM(
|
||||
"node", "way", "relation", name="osm_type_enum", metadata=Base.metadata
|
||||
)
|
||||
|
||||
re_lau_code = re.compile(r"^[A-Z]{2}([^A-Z].+)$") # 'LAU (local administrative unit)'
|
||||
|
||||
property_map = [
|
||||
("P238", ["iata"], "IATA airport code"),
|
||||
("P239", ["icao"], "ICAO airport code"),
|
||||
("P240", ["faa", "ref"], "FAA airport code"),
|
||||
("P296", ["ref", "ref:train", "railway:ref"], "station code"),
|
||||
("P300", ["ISO3166-2"], "ISO 3166-2 code"),
|
||||
("P359", ["ref:rce"], "Rijksmonument ID"),
|
||||
("P590", ["ref:gnis", "GNISID", "gnis:id", "gnis:feature_id"], "USGS GNIS ID"),
|
||||
("P649", ["ref:nrhp"], "NRHP reference number"),
|
||||
("P722", ["uic_ref"], "UIC station code"),
|
||||
("P782", ["ref"], "LAU (local administrative unit)"),
|
||||
("P836", ["ref:gss"], "UK Government Statistical Service code"),
|
||||
("P856", ["website", "contact:website", "url"], "website"),
|
||||
("P882", ["nist:fips_code"], "FIPS 6-4 (US counties)"),
|
||||
("P901", ["ref:fips"], "FIPS 10-4 (countries and regions)"),
|
||||
# A UIC id can be a IBNR, but not every IBNR is an UIC id
|
||||
("P954", ["uic_ref"], "IBNR ID"),
|
||||
("P981", ["ref:woonplaatscode"], "BAG code for Dutch residencies"),
|
||||
("P1216", ["HE_ref"], "National Heritage List for England number"),
|
||||
("P2253", ["ref:edubase"], "EDUBase URN"),
|
||||
("P2815", ["esr:user", "ref", "ref:train"], "ESR station code"),
|
||||
("P3425", ["ref", "ref:SIC"], "Natura 2000 site ID"),
|
||||
("P3562", ["seamark:light:reference"], "Admiralty number"),
|
||||
(
|
||||
"P4755",
|
||||
["ref", "ref:train", "ref:crs", "crs", "nat_ref"],
|
||||
"UK railway station code",
|
||||
),
|
||||
("P4803", ["ref", "ref:train"], "Amtrak station code"),
|
||||
("P6082", ["nycdoitt:bin"], "NYC Building Identification Number"),
|
||||
("P5086", ["ref"], "FIPS 5-2 alpha code (US states)"),
|
||||
("P5087", ["ref:fips"], "FIPS 5-2 numeric code (US states)"),
|
||||
("P5208", ["ref:bag"], "BAG building ID for Dutch buildings"),
|
||||
]
|
||||
|
||||
T = typing.TypeVar("T", bound="Item")
|
||||
|
||||
osm_type_enum = postgresql.ENUM('node', 'way', 'relation',
|
||||
name='osm_type_enum',
|
||||
metadata=Base.metadata)
|
||||
|
||||
class Item(Base):
|
||||
"""Wikidata item."""
|
||||
|
||||
__tablename__ = "item"
|
||||
item_id = Column(Integer, primary_key=True, autoincrement=False)
|
||||
labels = Column(postgresql.JSONB)
|
||||
descriptions = Column(postgresql.JSONB)
|
||||
aliases = Column(postgresql.JSONB)
|
||||
sitelinks = Column(postgresql.JSONB)
|
||||
claims = Column(postgresql.JSONB)
|
||||
claims = Column(postgresql.JSONB, nullable=False)
|
||||
lastrevid = Column(Integer, nullable=False, unique=True)
|
||||
locations = relationship("ItemLocation", cascade="all, delete-orphan", backref="item")
|
||||
locations = relationship(
|
||||
"ItemLocation", cascade="all, delete-orphan", backref="item"
|
||||
)
|
||||
qid = column_property("Q" + cast(item_id, String))
|
||||
|
||||
wiki_extracts = relationship(
|
||||
"Extract",
|
||||
collection_class=attribute_mapped_collection("site"),
|
||||
cascade="save-update, merge, delete, delete-orphan",
|
||||
backref="item",
|
||||
)
|
||||
extracts = association_proxy("wiki_extracts", "extract")
|
||||
|
||||
@classmethod
|
||||
def get_by_qid(cls, qid):
|
||||
def get_by_qid(cls: typing.Type[T], qid: str) -> T | None:
|
||||
if qid and len(qid) > 1 and qid[0].upper() == "Q" and qid[1:].isdigit():
|
||||
return cls.query.get(qid[1:])
|
||||
obj: T = cls.query.get(qid[1:])
|
||||
return obj
|
||||
return None
|
||||
|
||||
@property
|
||||
def wd_url(self):
|
||||
def wd_url(self) -> str:
|
||||
"""Wikidata URL for item."""
|
||||
return f"https://www.wikidata.org/wiki/{self.qid}"
|
||||
|
||||
def get_claim(self, pid):
|
||||
return [i["mainsnak"]["datavalue"]["value"] if "datavalue" in i["mainsnak"] else None
|
||||
for i in self.claims.get(pid, [])]
|
||||
def get_claim(self, pid: str) -> list[dict[str, Any] | None]:
|
||||
"""List of claims for given Wikidata property ID."""
|
||||
claims = typing.cast(dict[str, list[dict[str, Any]]], self.claims)
|
||||
return [
|
||||
i["mainsnak"]["datavalue"]["value"]
|
||||
if "datavalue" in i["mainsnak"]
|
||||
else None
|
||||
for i in claims.get(pid, [])
|
||||
]
|
||||
|
||||
def label(self, lang='en'):
|
||||
if lang in self.labels:
|
||||
return self.labels[lang]['value']
|
||||
elif 'en' in self.labels:
|
||||
return self.labels['en']['value']
|
||||
def label(self, lang: str = "en") -> str:
|
||||
"""Label for this Wikidata item."""
|
||||
labels = typing.cast(dict[str, dict[str, Any]], self.labels)
|
||||
if lang in labels:
|
||||
return typing.cast(str, labels[lang]["value"])
|
||||
elif "en" in labels:
|
||||
return typing.cast(str, labels["en"]["value"])
|
||||
|
||||
label_list = list(self.labels.values())
|
||||
return label_list[0]['value'] if label_list else '[no label]'
|
||||
label_list = list(labels.values())
|
||||
return typing.cast(str, label_list[0]["value"]) if label_list else "[no label]"
|
||||
|
||||
def description(self, lang='en'):
|
||||
if lang in self.descriptions:
|
||||
return self.descriptions[lang]['value']
|
||||
elif 'en' in self.descriptions:
|
||||
return self.descriptions['en']['value']
|
||||
return
|
||||
def description(self, lang: str = "en") -> str | None:
|
||||
"""Return a description of the item."""
|
||||
descriptions = typing.cast(dict[str, dict[str, Any]], self.descriptions)
|
||||
if lang in descriptions:
|
||||
return typing.cast(str, descriptions[lang]["value"])
|
||||
elif "en" in descriptions:
|
||||
return typing.cast(str, descriptions["en"]["value"])
|
||||
return None
|
||||
|
||||
d_list = list(self.descriptions.values())
|
||||
if d_list:
|
||||
return d_list[0]['value']
|
||||
return d_list[0]["value"]
|
||||
|
||||
def get_aliases(self, lang='en'):
|
||||
def get_aliases(self, lang="en"):
|
||||
if lang not in self.aliases:
|
||||
if 'en' not in self.aliases:
|
||||
if "en" not in self.aliases:
|
||||
return []
|
||||
lang = 'en'
|
||||
return [a['value'] for a in self.aliases[lang]]
|
||||
lang = "en"
|
||||
return [a["value"] for a in self.aliases[lang]]
|
||||
|
||||
def get_part_of_names(self):
|
||||
if not self.claims:
|
||||
return set()
|
||||
|
||||
part_of_names = set()
|
||||
for p361 in self.claims.get('P361', []):
|
||||
for p361 in self.claims.get("P361", []):
|
||||
try:
|
||||
part_of_id = p361['mainsnak']['datavalue']['value']['numeric-id']
|
||||
part_of_id = p361["mainsnak"]["datavalue"]["value"]["numeric-id"]
|
||||
except KeyError:
|
||||
continue
|
||||
if part_of_id == self.item_id:
|
||||
|
|
@ -98,7 +168,7 @@ class Item(Base):
|
|||
|
||||
@property
|
||||
def entity(self):
|
||||
keys = ['labels', 'aliases', 'descriptions', 'sitelinks', 'claims']
|
||||
keys = ["labels", "aliases", "descriptions", "sitelinks", "claims"]
|
||||
return {key: getattr(self, key) for key in keys}
|
||||
|
||||
def names(self, check_part_of=True):
|
||||
|
|
@ -107,22 +177,24 @@ class Item(Base):
|
|||
d = wikidata.names_from_entity(self.entity) or defaultdict(list)
|
||||
|
||||
for name, sources in list(d.items()):
|
||||
if len(sources) == 1 and sources[0][0] == 'image':
|
||||
if len(sources) == 1 and sources[0][0] == "image":
|
||||
continue
|
||||
for part_of_name in part_of_names:
|
||||
if not name.startswith(part_of_name):
|
||||
continue
|
||||
prefix_removed = name[len(part_of_name):].strip()
|
||||
prefix_removed = name[len(part_of_name) :].strip()
|
||||
if prefix_removed not in d:
|
||||
d[prefix_removed] = sources
|
||||
|
||||
if self.claims:
|
||||
for p6375 in self.claims.get('P6375', []):
|
||||
for p6375 in self.claims.get("P6375", []):
|
||||
try:
|
||||
street_address = p6375['mainsnak']['datavalue']['value']
|
||||
street_address = p6375["mainsnak"]["datavalue"]["value"]
|
||||
except KeyError:
|
||||
continue
|
||||
d[street_address['text']].append(('P6375', street_address.get('language')))
|
||||
d[street_address["text"]].append(
|
||||
("P6375", street_address.get("language"))
|
||||
)
|
||||
|
||||
# A terrace of buildings can be illustrated with a photo of a single building.
|
||||
# We try to determine if this is the case and avoid using the filename of the
|
||||
|
|
@ -131,8 +203,11 @@ class Item(Base):
|
|||
def has_digit(s):
|
||||
return any(c.isdigit() for c in s)
|
||||
|
||||
image_names = {name for name, sources in d.items()
|
||||
if len(sources) == 1 and sources[0][0] == 'image' and has_digit(name)}
|
||||
image_names = {
|
||||
name
|
||||
for name, sources in d.items()
|
||||
if len(sources) == 1 and sources[0][0] == "image" and has_digit(name)
|
||||
}
|
||||
if not image_names:
|
||||
return dict(d) or None
|
||||
|
||||
|
|
@ -166,10 +241,10 @@ class Item(Base):
|
|||
isa_qids = self.get_isa_qids()
|
||||
|
||||
matching_types = {
|
||||
"Q12731", # dead end street
|
||||
"Q34442", # road
|
||||
"Q79007", # street
|
||||
"Q83620", # thoroughfare
|
||||
"Q12731", # dead end street
|
||||
"Q34442", # road
|
||||
"Q79007", # street
|
||||
"Q83620", # thoroughfare
|
||||
"Q21000333", # shopping street
|
||||
"Q62685721", # pedestrian street
|
||||
}
|
||||
|
|
@ -179,14 +254,13 @@ class Item(Base):
|
|||
if isa_qids is None:
|
||||
isa_qids = self.get_isa_qids()
|
||||
matching_types = {
|
||||
"Q355304", # watercourse
|
||||
"Q4022", # river
|
||||
"Q47521", # stream
|
||||
"Q1437299", # creek
|
||||
"Q355304", # watercourse
|
||||
"Q4022", # river
|
||||
"Q47521", # stream
|
||||
"Q1437299", # creek
|
||||
"Q63565252", # brook
|
||||
"Q12284", # canal
|
||||
"Q12284", # canal
|
||||
"Q55659167", # natural watercourse
|
||||
|
||||
}
|
||||
return bool(matching_types & set(isa_qids))
|
||||
|
||||
|
|
@ -195,19 +269,29 @@ class Item(Base):
|
|||
return self.is_street(isa_qids) or self.is_watercourse(isa_qids)
|
||||
|
||||
def is_tram_stop(self):
|
||||
return 'Q2175765' in self.get_isa_qids()
|
||||
return "Q2175765" in self.get_isa_qids()
|
||||
|
||||
def alert_admin_about_bad_time(self, v):
|
||||
body = ("Wikidata item has an unsupported time precision\n\n"
|
||||
+ self.wd_url + "\n\n" + "Value:\n\n" + json.dumps(v, indent=2))
|
||||
body = (
|
||||
"Wikidata item has an unsupported time precision\n\n"
|
||||
+ self.wd_url
|
||||
+ "\n\n"
|
||||
+ "Value:\n\n"
|
||||
+ json.dumps(v, indent=2)
|
||||
)
|
||||
mail.send_mail(f"OWL Map: bad time value in {self.qid}", body)
|
||||
|
||||
def closed(self):
|
||||
def time_claim(self, pid):
|
||||
ret = []
|
||||
for v in self.get_claim("P3999"):
|
||||
for v in self.get_claim(pid):
|
||||
if not v:
|
||||
continue
|
||||
t = utils.format_wikibase_time(v)
|
||||
try:
|
||||
t = utils.format_wikibase_time(v)
|
||||
except Exception:
|
||||
self.alert_admin_about_bad_time(v)
|
||||
raise
|
||||
|
||||
if t:
|
||||
ret.append(t)
|
||||
else:
|
||||
|
|
@ -215,6 +299,84 @@ class Item(Base):
|
|||
|
||||
return ret
|
||||
|
||||
def closed(self):
|
||||
return self.time_claim("P3999")
|
||||
|
||||
def first_paragraph_language(self, lang):
|
||||
if lang not in self.sitelinks():
|
||||
return
|
||||
extract = self.extracts.get(lang)
|
||||
if not extract:
|
||||
return
|
||||
|
||||
empty_list = [
|
||||
"<p><span></span></p>",
|
||||
"<p><span></span>\n</p>",
|
||||
"<p><span></span>\n\n</p>",
|
||||
"<p>\n<span></span>\n</p>",
|
||||
"<p>\n\n<span></span>\n</p>",
|
||||
"<p>.\n</p>",
|
||||
"<p><br></p>",
|
||||
'<p class="mw-empty-elt">\n</p>',
|
||||
'<p class="mw-empty-elt">\n\n</p>',
|
||||
'<p class="mw-empty-elt">\n\n\n</p>',
|
||||
]
|
||||
|
||||
text = extract.strip()
|
||||
while True:
|
||||
found_empty = False
|
||||
for empty in empty_list:
|
||||
if text.startswith(empty):
|
||||
text = text[len(empty) :].strip()
|
||||
found_empty = True
|
||||
if not found_empty:
|
||||
break
|
||||
|
||||
close_tag = "</p>"
|
||||
first_end_p_tag = text.find(close_tag)
|
||||
if first_end_p_tag == -1:
|
||||
# FIXME: e-mail admin
|
||||
return text
|
||||
|
||||
return text[: first_end_p_tag + len(close_tag)]
|
||||
|
||||
def get_identifiers_tags(self):
|
||||
tags = defaultdict(list)
|
||||
for claim, osm_keys, label in property_map:
|
||||
values = [
|
||||
i["mainsnak"]["datavalue"]["value"]
|
||||
for i in self.claims.get(claim, [])
|
||||
if "datavalue" in i["mainsnak"]
|
||||
]
|
||||
if not values:
|
||||
continue
|
||||
if claim == "P782":
|
||||
values += [
|
||||
m.group(1) for m in (re_lau_code.match(v) for v in values) if m
|
||||
]
|
||||
for osm_key in osm_keys:
|
||||
tags[osm_key].append((values, label))
|
||||
return dict(tags)
|
||||
|
||||
def get_identifiers(self):
|
||||
ret = {}
|
||||
for claim, osm_keys, label in property_map:
|
||||
values = [
|
||||
i["mainsnak"]["datavalue"]["value"]
|
||||
for i in self.claims.get(claim, [])
|
||||
if "datavalue" in i["mainsnak"]
|
||||
]
|
||||
if not values:
|
||||
continue
|
||||
if claim == "P782":
|
||||
values += [
|
||||
m.group(1) for m in (re_lau_code.match(v) for v in values) if m
|
||||
]
|
||||
for osm_key in osm_keys:
|
||||
ret[label] = values
|
||||
return ret
|
||||
|
||||
|
||||
# class Claim(Base):
|
||||
# __tablename__ = "claim"
|
||||
# item_id = Column(Integer, primary_key=True)
|
||||
|
|
@ -222,13 +384,14 @@ class Item(Base):
|
|||
# position = Column(Integer, primary_key=True)
|
||||
# mainsnak = Column(postgresql.JSONB)
|
||||
|
||||
class ItemIsA(Base):
|
||||
__tablename__ = 'item_isa'
|
||||
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
|
||||
isa_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
|
||||
|
||||
item = relationship('Item', foreign_keys=[item_id])
|
||||
isa = relationship('Item', foreign_keys=[isa_id])
|
||||
class ItemIsA(Base):
|
||||
__tablename__ = "item_isa"
|
||||
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
|
||||
isa_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
|
||||
|
||||
item = relationship("Item", foreign_keys=[item_id])
|
||||
isa = relationship("Item", foreign_keys=[isa_id])
|
||||
|
||||
|
||||
class ItemLocation(Base):
|
||||
|
|
@ -241,18 +404,21 @@ class ItemLocation(Base):
|
|||
qid = column_property("Q" + cast(item_id, String))
|
||||
pid = column_property("P" + cast(item_id, String))
|
||||
|
||||
def get_lat_lon(self):
|
||||
return session.query(func.ST_Y(self.location),
|
||||
func.ST_X(self.location)).one()
|
||||
def get_lat_lon(self) -> tuple[float, float]:
|
||||
"""Get latitude and longitude of item."""
|
||||
loc: tuple[float, float]
|
||||
loc = session.query(func.ST_Y(self.location), func.ST_X(self.location)).one()
|
||||
return loc
|
||||
|
||||
|
||||
def location_objects(coords):
|
||||
locations = []
|
||||
for pid, coord_list in coords.items():
|
||||
for num, coords in enumerate(coord_list):
|
||||
point = f"POINT({coords['longitude']} {coords['latitude']})"
|
||||
loc = ItemLocation(property_id=int(pid[1:]),
|
||||
statement_order=num,
|
||||
location=point)
|
||||
loc = ItemLocation(
|
||||
property_id=int(pid[1:]), statement_order=num, location=point
|
||||
)
|
||||
locations.append(loc)
|
||||
return locations
|
||||
|
||||
|
|
@ -282,8 +448,7 @@ class MapMixin:
|
|||
@declared_attr
|
||||
def geojson_str(cls):
|
||||
return column_property(
|
||||
func.ST_AsGeoJSON(cls.way, maxdecimaldigits=6),
|
||||
deferred=True
|
||||
func.ST_AsGeoJSON(cls.way, maxdecimaldigits=6), deferred=True
|
||||
)
|
||||
|
||||
@declared_attr
|
||||
|
|
@ -292,17 +457,16 @@ class MapMixin:
|
|||
|
||||
@hybrid_property
|
||||
def has_street_address(self):
|
||||
return ("addr:housenumber" in self.tags
|
||||
and "addr:street" in self.tags)
|
||||
return "addr:housenumber" in self.tags and "addr:street" in self.tags
|
||||
|
||||
def display_name(self):
|
||||
for key in 'bridge:name', 'tunnel:name', 'lock_name':
|
||||
for key in "bridge:name", "tunnel:name", "lock_name":
|
||||
if key in self.tags:
|
||||
return self.tags[key]
|
||||
|
||||
return (self.name
|
||||
or self.tags.get("addr:housename")
|
||||
or self.tags.get("inscription"))
|
||||
return (
|
||||
self.name or self.tags.get("addr:housename") or self.tags.get("inscription")
|
||||
)
|
||||
|
||||
def geojson(self):
|
||||
return json.loads(self.geojson_str)
|
||||
|
|
@ -343,7 +507,7 @@ class Line(MapMixin, Base):
|
|||
|
||||
@classmethod
|
||||
def get_osm(cls, osm_type, osm_id):
|
||||
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
|
||||
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
|
||||
return cls.query.get(src_id)
|
||||
|
||||
|
||||
|
|
@ -352,11 +516,12 @@ class Polygon(MapMixin, Base):
|
|||
|
||||
@classmethod
|
||||
def get_osm(cls, osm_type, osm_id):
|
||||
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
|
||||
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
|
||||
return cls.query.get(src_id)
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
def type(self) -> str:
|
||||
"""Polygon is either a way or a relation."""
|
||||
return "way" if self.src_id > 0 else "relation"
|
||||
|
||||
@declared_attr
|
||||
|
|
@ -364,12 +529,15 @@ class Polygon(MapMixin, Base):
|
|||
return column_property(func.ST_Area(cls.way, False), deferred=True)
|
||||
|
||||
@hybrid_property
|
||||
def area_in_sq_km(self):
|
||||
def area_in_sq_km(self) -> float:
|
||||
"""Size of area in square km."""
|
||||
return self.area / (1000 * 1000)
|
||||
|
||||
|
||||
class User(Base, UserMixin):
|
||||
__tablename__ = 'user'
|
||||
"""User."""
|
||||
|
||||
__tablename__ = "user"
|
||||
id = Column(Integer, primary_key=True)
|
||||
username = Column(String)
|
||||
password = Column(String)
|
||||
|
|
@ -392,23 +560,27 @@ class User(Base, UserMixin):
|
|||
osm_oauth_token = Column(String)
|
||||
osm_oauth_token_secret = Column(String)
|
||||
|
||||
def is_active(self):
|
||||
def is_active(self) -> bool:
|
||||
"""User is active."""
|
||||
return self.active
|
||||
|
||||
|
||||
class EditSession(Base):
|
||||
__tablename__ = 'edit_session'
|
||||
__tablename__ = "edit_session"
|
||||
id = Column(Integer, primary_key=True)
|
||||
user_id = Column(Integer, ForeignKey(User.id))
|
||||
created = Column(DateTime, default=now_utc(), nullable=False)
|
||||
edit_list = Column(postgresql.JSONB)
|
||||
comment = Column(String)
|
||||
|
||||
user = relationship('User')
|
||||
changeset = relationship('Changeset', back_populates='edit_session', uselist=False)
|
||||
user = relationship("User")
|
||||
changeset = relationship("Changeset", back_populates="edit_session", uselist=False)
|
||||
|
||||
|
||||
class Changeset(Base):
|
||||
__tablename__ = 'changeset'
|
||||
"""An OSM Changeset generated by this tool."""
|
||||
|
||||
__tablename__ = "changeset"
|
||||
id = Column(BigInteger, primary_key=True)
|
||||
created = Column(DateTime)
|
||||
comment = Column(String)
|
||||
|
|
@ -416,38 +588,62 @@ class Changeset(Base):
|
|||
update_count = Column(Integer, nullable=False)
|
||||
edit_session_id = Column(Integer, ForeignKey(EditSession.id))
|
||||
|
||||
user = relationship('User',
|
||||
backref=backref('changesets',
|
||||
lazy='dynamic',
|
||||
order_by='Changeset.created.desc()'))
|
||||
user = relationship(
|
||||
"User",
|
||||
backref=backref(
|
||||
"changesets", lazy="dynamic", order_by="Changeset.created.desc()"
|
||||
),
|
||||
)
|
||||
|
||||
edit_session = relationship('EditSession', back_populates='changeset')
|
||||
edit_session = relationship("EditSession", back_populates="changeset")
|
||||
|
||||
|
||||
class ChangesetEdit(Base):
|
||||
__tablename__ = 'changeset_edit'
|
||||
"""Record details of edits within a changeset."""
|
||||
|
||||
changeset_id = Column(BigInteger,
|
||||
ForeignKey('changeset.id'),
|
||||
primary_key=True)
|
||||
__tablename__ = "changeset_edit"
|
||||
|
||||
changeset_id = Column(BigInteger, ForeignKey("changeset.id"), primary_key=True)
|
||||
item_id = Column(Integer, primary_key=True)
|
||||
osm_id = Column(BigInteger, primary_key=True)
|
||||
osm_type = Column(osm_type_enum, primary_key=True)
|
||||
saved = Column(DateTime, default=now_utc(), nullable=False)
|
||||
|
||||
changeset = relationship('Changeset',
|
||||
backref=backref('edits', lazy='dynamic'))
|
||||
changeset = relationship("Changeset", backref=backref("edits", lazy="dynamic"))
|
||||
|
||||
|
||||
class SkipIsA(Base):
|
||||
__tablename__ = 'skip_isa'
|
||||
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
|
||||
"""Ignore this item type when walking the Wikidata subclass graph."""
|
||||
|
||||
__tablename__ = "skip_isa"
|
||||
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
|
||||
qid = column_property("Q" + cast(item_id, String))
|
||||
|
||||
item = relationship("Item")
|
||||
|
||||
item = relationship('Item')
|
||||
|
||||
class ItemExtraKeys(Base):
|
||||
__tablename__ = 'item_extra_keys'
|
||||
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
|
||||
"""Extra tag or key to consider for an Wikidata item type."""
|
||||
|
||||
__tablename__ = "item_extra_keys"
|
||||
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
|
||||
tag_or_key = Column(String, primary_key=True)
|
||||
note = Column(String)
|
||||
qid = column_property("Q" + cast(item_id, String))
|
||||
|
||||
item = relationship('Item')
|
||||
item = relationship("Item")
|
||||
|
||||
|
||||
class Extract(Base):
|
||||
"""First paragraph from Wikipedia."""
|
||||
|
||||
__tablename__ = "extract"
|
||||
|
||||
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
|
||||
site = Column(String, primary_key=True)
|
||||
extract = Column(String, nullable=False)
|
||||
|
||||
def __init__(self, site: str, extract: str):
|
||||
"""Initialise the object."""
|
||||
self.site = site
|
||||
self.extract = extract
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue