Reformat with black

This commit is contained in:
Edward Betts 2022-04-18 12:51:02 +01:00
parent 74788872a3
commit c95b58fde9
1 changed files with 106 additions and 87 deletions

View File

@ -20,11 +20,12 @@ import re
Base = declarative_base()
Base.query = session.query_property()
re_point = re.compile(r'^POINT\((.+) (.+)\)$')
re_point = re.compile(r"^POINT\((.+) (.+)\)$")
osm_type_enum = postgresql.ENUM(
"node", "way", "relation", name="osm_type_enum", metadata=Base.metadata
)
osm_type_enum = postgresql.ENUM('node', 'way', 'relation',
name='osm_type_enum',
metadata=Base.metadata)
class Item(Base):
__tablename__ = "item"
@ -35,7 +36,9 @@ class Item(Base):
sitelinks = Column(postgresql.JSONB)
claims = Column(postgresql.JSONB)
lastrevid = Column(Integer, nullable=False, unique=True)
locations = relationship("ItemLocation", cascade="all, delete-orphan", backref="item")
locations = relationship(
"ItemLocation", cascade="all, delete-orphan", backref="item"
)
qid = column_property("Q" + cast(item_id, String))
wiki_extracts = relationship(
@ -56,44 +59,48 @@ class Item(Base):
return f"https://www.wikidata.org/wiki/{self.qid}"
def get_claim(self, pid):
return [i["mainsnak"]["datavalue"]["value"] if "datavalue" in i["mainsnak"] else None
for i in self.claims.get(pid, [])]
return [
i["mainsnak"]["datavalue"]["value"]
if "datavalue" in i["mainsnak"]
else None
for i in self.claims.get(pid, [])
]
def label(self, lang='en'):
def label(self, lang="en"):
if lang in self.labels:
return self.labels[lang]['value']
elif 'en' in self.labels:
return self.labels['en']['value']
return self.labels[lang]["value"]
elif "en" in self.labels:
return self.labels["en"]["value"]
label_list = list(self.labels.values())
return label_list[0]['value'] if label_list else '[no label]'
return label_list[0]["value"] if label_list else "[no label]"
def description(self, lang='en'):
def description(self, lang="en"):
if lang in self.descriptions:
return self.descriptions[lang]['value']
elif 'en' in self.descriptions:
return self.descriptions['en']['value']
return self.descriptions[lang]["value"]
elif "en" in self.descriptions:
return self.descriptions["en"]["value"]
return
d_list = list(self.descriptions.values())
if d_list:
return d_list[0]['value']
return d_list[0]["value"]
def get_aliases(self, lang='en'):
def get_aliases(self, lang="en"):
if lang not in self.aliases:
if 'en' not in self.aliases:
if "en" not in self.aliases:
return []
lang = 'en'
return [a['value'] for a in self.aliases[lang]]
lang = "en"
return [a["value"] for a in self.aliases[lang]]
def get_part_of_names(self):
if not self.claims:
return set()
part_of_names = set()
for p361 in self.claims.get('P361', []):
for p361 in self.claims.get("P361", []):
try:
part_of_id = p361['mainsnak']['datavalue']['value']['numeric-id']
part_of_id = p361["mainsnak"]["datavalue"]["value"]["numeric-id"]
except KeyError:
continue
if part_of_id == self.item_id:
@ -108,7 +115,7 @@ class Item(Base):
@property
def entity(self):
keys = ['labels', 'aliases', 'descriptions', 'sitelinks', 'claims']
keys = ["labels", "aliases", "descriptions", "sitelinks", "claims"]
return {key: getattr(self, key) for key in keys}
def names(self, check_part_of=True):
@ -117,22 +124,24 @@ class Item(Base):
d = wikidata.names_from_entity(self.entity) or defaultdict(list)
for name, sources in list(d.items()):
if len(sources) == 1 and sources[0][0] == 'image':
if len(sources) == 1 and sources[0][0] == "image":
continue
for part_of_name in part_of_names:
if not name.startswith(part_of_name):
continue
prefix_removed = name[len(part_of_name):].strip()
prefix_removed = name[len(part_of_name) :].strip()
if prefix_removed not in d:
d[prefix_removed] = sources
if self.claims:
for p6375 in self.claims.get('P6375', []):
for p6375 in self.claims.get("P6375", []):
try:
street_address = p6375['mainsnak']['datavalue']['value']
street_address = p6375["mainsnak"]["datavalue"]["value"]
except KeyError:
continue
d[street_address['text']].append(('P6375', street_address.get('language')))
d[street_address["text"]].append(
("P6375", street_address.get("language"))
)
# A terrace of buildings can be illustrated with a photo of a single building.
# We try to determine if this is the case and avoid using the filename of the
@ -141,8 +150,11 @@ class Item(Base):
def has_digit(s):
return any(c.isdigit() for c in s)
image_names = {name for name, sources in d.items()
if len(sources) == 1 and sources[0][0] == 'image' and has_digit(name)}
image_names = {
name
for name, sources in d.items()
if len(sources) == 1 and sources[0][0] == "image" and has_digit(name)
}
if not image_names:
return dict(d) or None
@ -176,10 +188,10 @@ class Item(Base):
isa_qids = self.get_isa_qids()
matching_types = {
"Q12731", # dead end street
"Q34442", # road
"Q79007", # street
"Q83620", # thoroughfare
"Q12731", # dead end street
"Q34442", # road
"Q79007", # street
"Q83620", # thoroughfare
"Q21000333", # shopping street
"Q62685721", # pedestrian street
}
@ -189,14 +201,13 @@ class Item(Base):
if isa_qids is None:
isa_qids = self.get_isa_qids()
matching_types = {
"Q355304", # watercourse
"Q4022", # river
"Q47521", # stream
"Q1437299", # creek
"Q355304", # watercourse
"Q4022", # river
"Q47521", # stream
"Q1437299", # creek
"Q63565252", # brook
"Q12284", # canal
"Q12284", # canal
"Q55659167", # natural watercourse
}
return bool(matching_types & set(isa_qids))
@ -205,11 +216,16 @@ class Item(Base):
return self.is_street(isa_qids) or self.is_watercourse(isa_qids)
def is_tram_stop(self):
return 'Q2175765' in self.get_isa_qids()
return "Q2175765" in self.get_isa_qids()
def alert_admin_about_bad_time(self, v):
body = ("Wikidata item has an unsupported time precision\n\n"
+ self.wd_url + "\n\n" + "Value:\n\n" + json.dumps(v, indent=2))
body = (
"Wikidata item has an unsupported time precision\n\n"
+ self.wd_url
+ "\n\n"
+ "Value:\n\n"
+ json.dumps(v, indent=2)
)
mail.send_mail(f"OWL Map: bad time value in {self.qid}", body)
def time_claim(self, pid):
@ -271,6 +287,7 @@ class Item(Base):
return text[: first_end_p_tag + len(close_tag)]
# class Claim(Base):
# __tablename__ = "claim"
# item_id = Column(Integer, primary_key=True)
@ -278,13 +295,14 @@ class Item(Base):
# position = Column(Integer, primary_key=True)
# mainsnak = Column(postgresql.JSONB)
class ItemIsA(Base):
__tablename__ = 'item_isa'
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
isa_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
item = relationship('Item', foreign_keys=[item_id])
isa = relationship('Item', foreign_keys=[isa_id])
class ItemIsA(Base):
__tablename__ = "item_isa"
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
isa_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
item = relationship("Item", foreign_keys=[item_id])
isa = relationship("Item", foreign_keys=[isa_id])
class ItemLocation(Base):
@ -298,17 +316,17 @@ class ItemLocation(Base):
pid = column_property("P" + cast(item_id, String))
def get_lat_lon(self):
return session.query(func.ST_Y(self.location),
func.ST_X(self.location)).one()
return session.query(func.ST_Y(self.location), func.ST_X(self.location)).one()
def location_objects(coords):
locations = []
for pid, coord_list in coords.items():
for num, coords in enumerate(coord_list):
point = f"POINT({coords['longitude']} {coords['latitude']})"
loc = ItemLocation(property_id=int(pid[1:]),
statement_order=num,
location=point)
loc = ItemLocation(
property_id=int(pid[1:]), statement_order=num, location=point
)
locations.append(loc)
return locations
@ -338,8 +356,7 @@ class MapMixin:
@declared_attr
def geojson_str(cls):
return column_property(
func.ST_AsGeoJSON(cls.way, maxdecimaldigits=6),
deferred=True
func.ST_AsGeoJSON(cls.way, maxdecimaldigits=6), deferred=True
)
@declared_attr
@ -348,17 +365,16 @@ class MapMixin:
@hybrid_property
def has_street_address(self):
return ("addr:housenumber" in self.tags
and "addr:street" in self.tags)
return "addr:housenumber" in self.tags and "addr:street" in self.tags
def display_name(self):
for key in 'bridge:name', 'tunnel:name', 'lock_name':
for key in "bridge:name", "tunnel:name", "lock_name":
if key in self.tags:
return self.tags[key]
return (self.name
or self.tags.get("addr:housename")
or self.tags.get("inscription"))
return (
self.name or self.tags.get("addr:housename") or self.tags.get("inscription")
)
def geojson(self):
return json.loads(self.geojson_str)
@ -399,7 +415,7 @@ class Line(MapMixin, Base):
@classmethod
def get_osm(cls, osm_type, osm_id):
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
return cls.query.get(src_id)
@ -408,7 +424,7 @@ class Polygon(MapMixin, Base):
@classmethod
def get_osm(cls, osm_type, osm_id):
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
return cls.query.get(src_id)
@property
@ -425,7 +441,7 @@ class Polygon(MapMixin, Base):
class User(Base, UserMixin):
__tablename__ = 'user'
__tablename__ = "user"
id = Column(Integer, primary_key=True)
username = Column(String)
password = Column(String)
@ -451,20 +467,21 @@ class User(Base, UserMixin):
def is_active(self):
return self.active
class EditSession(Base):
__tablename__ = 'edit_session'
__tablename__ = "edit_session"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey(User.id))
created = Column(DateTime, default=now_utc(), nullable=False)
edit_list = Column(postgresql.JSONB)
comment = Column(String)
user = relationship('User')
changeset = relationship('Changeset', back_populates='edit_session', uselist=False)
user = relationship("User")
changeset = relationship("Changeset", back_populates="edit_session", uselist=False)
class Changeset(Base):
__tablename__ = 'changeset'
__tablename__ = "changeset"
id = Column(BigInteger, primary_key=True)
created = Column(DateTime)
comment = Column(String)
@ -472,41 +489,43 @@ class Changeset(Base):
update_count = Column(Integer, nullable=False)
edit_session_id = Column(Integer, ForeignKey(EditSession.id))
user = relationship('User',
backref=backref('changesets',
lazy='dynamic',
order_by='Changeset.created.desc()'))
user = relationship(
"User",
backref=backref(
"changesets", lazy="dynamic", order_by="Changeset.created.desc()"
),
)
edit_session = relationship('EditSession', back_populates='changeset')
edit_session = relationship("EditSession", back_populates="changeset")
class ChangesetEdit(Base):
__tablename__ = 'changeset_edit'
__tablename__ = "changeset_edit"
changeset_id = Column(BigInteger,
ForeignKey('changeset.id'),
primary_key=True)
changeset_id = Column(BigInteger, ForeignKey("changeset.id"), primary_key=True)
item_id = Column(Integer, primary_key=True)
osm_id = Column(BigInteger, primary_key=True)
osm_type = Column(osm_type_enum, primary_key=True)
saved = Column(DateTime, default=now_utc(), nullable=False)
changeset = relationship('Changeset',
backref=backref('edits', lazy='dynamic'))
changeset = relationship("Changeset", backref=backref("edits", lazy="dynamic"))
class SkipIsA(Base):
__tablename__ = 'skip_isa'
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
__tablename__ = "skip_isa"
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
item = relationship("Item")
item = relationship('Item')
class ItemExtraKeys(Base):
__tablename__ = 'item_extra_keys'
item_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
__tablename__ = "item_extra_keys"
item_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
tag_or_key = Column(String, primary_key=True)
note = Column(String)
item = relationship('Item')
item = relationship("Item")
class Extract(Base):
__tablename__ = "extract"