owl-map/matcher/mail.py

127 lines
3 KiB
Python
Raw Permalink Normal View History

2023-05-14 16:00:53 +01:00
"""Send email to admins to about errors or other notworthy things."""
import pprint
2023-05-14 11:37:14 +01:00
import smtplib
import sys
import traceback
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
2023-05-14 11:37:14 +01:00
2023-05-14 16:00:53 +01:00
import flask
import requests
2023-05-14 11:37:14 +01:00
from flask import current_app, g, has_request_context, request
2023-05-14 16:00:53 +01:00
from . import wikidata_api
2023-05-14 16:00:53 +01:00
def send_mail(
subject: str, body: str, config: flask.config.Config | None = None
) -> None:
"""Send an email to admins, catch and ignore exceptions."""
try:
send_mail_main(subject, body, config=config)
except smtplib.SMTPDataError:
pass # ignore email errors
2023-05-14 16:00:53 +01:00
def send_mail_main(
subject: str, body: str, config: flask.config.Config | None = None
) -> None:
"""Send an email to admins."""
if config is None:
config = current_app.config
mail_to = config["ADMIN_EMAIL"]
mail_from = config["MAIL_FROM"]
msg = MIMEText(body, "plain", "UTF-8")
msg["Subject"] = subject
msg["To"] = mail_to
msg["From"] = mail_from
msg["Date"] = formatdate()
msg["Message-ID"] = make_msgid()
2023-05-14 16:00:53 +01:00
extra_mail_headers: list[tuple[str, str]] = config.get("MAIL_HEADERS", [])
for key, value in extra_mail_headers:
assert key not in msg
msg[key] = value
s = smtplib.SMTP(config["SMTP_HOST"])
s.sendmail(mail_from, [mail_to], msg.as_string())
s.quit()
2023-05-14 16:00:53 +01:00
def get_username() -> str:
"""Get the username for the current user."""
user: str
if hasattr(g, "user"):
if g.user.is_authenticated:
user = g.user.username
else:
user = "not authenticated"
else:
user = "no user"
return user
2023-05-14 16:00:53 +01:00
def error_mail(
subject: str, data: str, r: requests.Response, via_web: bool = True
) -> None:
"""Error mail."""
body = f"""
remote URL: {r.url}
status code: {r.status_code}
request data:
{data}
status code: {r.status_code}
content-type: {r.headers["content-type"]}
reply:
{r.text}
"""
if has_request_context():
body = f"site URL: {request.url}\nuser: {get_username()}\n" + body
send_mail(subject, body)
2023-05-14 16:00:53 +01:00
def open_changeset_error(session_id: int, changeset: str, r: requests.Response) -> None:
"""Send error mail when failing to open a changeset."""
username = g.user.username
body = f"""
user: {username}
2023-05-14 11:37:14 +01:00
page: {r.url}
message user: https://www.openstreetmap.org/message/new/{username}
sent:
{changeset}
reply:
{r.text}
"""
2023-05-14 15:23:51 +01:00
send_mail("error creating changeset", body)
def send_traceback(info, prefix="osm-wikidata"):
exception_name = sys.exc_info()[0].__name__
subject = f"{prefix} error: {exception_name}"
body = f"user: {get_username()}\n" + info + "\n" + traceback.format_exc()
send_mail(subject, body)
2023-05-14 16:00:53 +01:00
def datavalue_missing(field: str, entity: wikidata_api.EntityType) -> None:
"""Send an email for a missing datavalue."""
qid = entity["title"]
2023-05-14 16:00:53 +01:00
body = f"https://www.wikidata.org/wiki/{qid}\n\n{pprint.pformat(entity)}"
subject = f"{qid}: datavalue missing in {field}"
send_mail(subject, body)