diff --git a/sourcing/database.py b/sourcing/database.py index 81406e3..8ff6c36 100644 --- a/sourcing/database.py +++ b/sourcing/database.py @@ -15,7 +15,7 @@ def init_db(db_url: str) -> None: def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: """Create an engine object.""" - return create_engine(db_url, pool_recycle=3600) + return create_engine(db_url, echo=False, pool_recycle=3600) def init_app(app: flask.app.Flask) -> None: diff --git a/sourcing/edl.py b/sourcing/edl.py index f31a28c..9ec0a6b 100644 --- a/sourcing/edl.py +++ b/sourcing/edl.py @@ -81,7 +81,7 @@ def fulfil_edl_with_sources( def fulfil_edl_with_links( edl: EDLDict, - doc_num: str = "", + doc_num: int | str = "", links: list[dict[str, str]] | None = None, hide_all_transclusions: bool = False, ): diff --git a/sourcing/forms.py b/sourcing/forms.py index 2515ec5..c6c1c40 100644 --- a/sourcing/forms.py +++ b/sourcing/forms.py @@ -1,82 +1,135 @@ from flask_wtf import FlaskForm -from wtforms.fields import StringField, PasswordField, BooleanField, HiddenField, TextAreaField, FileField, IntegerField -from wtforms.validators import InputRequired, Email, Length, ValidationError, Regexp, NoneOf, Optional -from .model import User, LoginError, re_username, reserved_name, user_exists +from wtforms.fields import ( + BooleanField, + FileField, + HiddenField, + IntegerField, + PasswordField, + StringField, + TextAreaField, +) +from wtforms.validators import ( + Email, + InputRequired, + Length, + NoneOf, + Optional, + Regexp, + ValidationError, +) + +from .model import LoginError, User, re_username, reserved_name, user_exists PASSWORD_LEN = 64 EMAIL_LEN = 64 + class SignupForm(FlaskForm): - username = StringField('username', - [InputRequired(), - Regexp(re_username), - NoneOf(reserved_name, message='Not available.'), - Length(min=3, max=64)], - [lambda name: name and name.replace(' ', '_')]) - email = StringField('e-mail address', - [InputRequired(), Email(), - Length(min=5, max=EMAIL_LEN)], - description="we never share your e-mail address") - password = StringField('password', - [InputRequired(), Length(min=4, max=PASSWORD_LEN)]) + """Signup form.""" + + username = StringField( + "username", + [ + InputRequired(), + Regexp(re_username), + NoneOf(reserved_name, message="Not available."), + Length(min=3, max=64), + ], + [lambda name: name and name.replace(" ", "_")], + ) + email = StringField( + "e-mail address", + [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)], + description="we never share your e-mail address", + ) + password = StringField( + "password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)] + ) def validate_username(form, field): if user_exists(User.username, field.data): - raise ValidationError('Not available') + raise ValidationError("Not available") def validate_email(form, field): if user_exists(User.email, field.data): - raise ValidationError('In use by another account') + raise ValidationError("In use by another account") + class LoginForm(FlaskForm): - user_or_email = StringField('username or e-mail address', - [InputRequired(), Length(min=3, max=EMAIL_LEN)], - [lambda name: name and name.replace(' ', '_')]) - password = PasswordField('password', - [InputRequired(), Length(max=PASSWORD_LEN)]) - remember = BooleanField('stay logged in') - next = HiddenField('next') + """Login form.""" - def validate(self): + user_or_email = StringField( + "username or e-mail address", + [InputRequired(), Length(min=3, max=EMAIL_LEN)], + [lambda name: name and name.replace(" ", "_")], + ) + password = PasswordField("password", [InputRequired(), Length(max=PASSWORD_LEN)]) + remember = BooleanField("stay logged in") + next = HiddenField("next") + + def validate(self) -> bool: + """Validate.""" rv = FlaskForm.validate(self) if not rv: return False try: - self.user = User.attempt_login(self.user_or_email.data, - self.password.data) + self.user = User.attempt_login(self.user_or_email.data, self.password.data) return True except LoginError as e: self.user_or_email.errors.append(e.msg) return False + class ForgotPasswordForm(FlaskForm): - user_or_email = StringField('username or e-mail address', - [InputRequired(), Length(max=EMAIL_LEN)]) + """Forgot password form.""" + + user_or_email = StringField( + "username or e-mail address", [InputRequired(), Length(max=EMAIL_LEN)] + ) + class PasswordForm(FlaskForm): - password = PasswordField('new password', - [InputRequired(), Length(min=4, max=PASSWORD_LEN)]) + """Password form.""" + + password = PasswordField( + "new password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)] + ) + class AccountSettingsForm(FlaskForm): - full_name = StringField('full name', [Length(max=64)]) - email = StringField('e-mail address', - [InputRequired(), Email(), - Length(min=5, max=EMAIL_LEN)]) + """Account settings form.""" + + full_name = StringField("full name", [Length(max=64)]) + email = StringField( + "e-mail address", [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)] + ) + class ChangePasswordForm(FlaskForm): - old_password = PasswordField('current password', - [InputRequired(), Length(max=PASSWORD_LEN)]) - new_password = PasswordField('new password', - [InputRequired(), Length(max=PASSWORD_LEN)]) + """Change password form.""" + + old_password = PasswordField( + "current password", [InputRequired(), Length(max=PASSWORD_LEN)] + ) + new_password = PasswordField( + "new password", [InputRequired(), Length(max=PASSWORD_LEN)] + ) + class SourceDocForm(FlaskForm): - text = TextAreaField('text', [InputRequired()]) - db_price_per_character = IntegerField('price per character', [Optional()]) - db_document_price = IntegerField('document price', [Optional()]) + """Source doc form.""" + + text = TextAreaField("text", [InputRequired()]) + db_price_per_character = IntegerField("price per character", [Optional()]) + db_document_price = IntegerField("document price", [Optional()]) + class ItemForm(FlaskForm): - text = TextAreaField('text', [InputRequired()]) + """Item form.""" + + text = TextAreaField("text", [InputRequired()]) + class UploadSourceDocForm(FlaskForm): - sourcedoc_file = FileField('SourceDoc', [Regexp(r'^[^/\\]+\.txt$')]) + sourcedoc_file = FileField("SourceDoc", [Regexp(r"^[^/\\]+\.txt$")]) diff --git a/sourcing/model.py b/sourcing/model.py index beb8024..ed3eb06 100644 --- a/sourcing/model.py +++ b/sourcing/model.py @@ -27,7 +27,7 @@ from sqlalchemy_continuum.plugins import ActivityPlugin, FlaskPlugin from werkzeug.security import check_password_hash, generate_password_hash from .database import session -from .parse import parse_link, parse_span +from .parse import ParsedLink, SpanTuple, parse_link, parse_span from .text import first_non_empty_line activity_plugin = ActivityPlugin() @@ -79,27 +79,37 @@ re_username = re.compile(r"^\w+$", re.U) re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U) re_comment = re.compile(r"#.*") +AllTitles = dict["Item", str] + def item_url(): return url_for("view.view_item", username=self.user.username, hashid=self.hashid) -def user_exists(field, value): - return session.query(exists().where(field == value)).scalar() +def user_exists(field: Column[str], value: str) -> bool: + """Check that user exists.""" + return typing.cast(bool, session.query(exists().where(field == value)).scalar()) class TimeStampedModel(Base): + """Time stamped model.""" + __abstract__ = True created = Column(DateTime, default=func.now()) modified = Column(DateTime, default=func.now(), onupdate=func.now()) class LoginError(Exception): - def __init__(self, msg): + """Login error.""" + + def __init__(self, msg: str): + """Init.""" self.msg = msg class User(TimeStampedModel, UserMixin): + """User model.""" + __tablename__ = "user" id = Column(Integer, primary_key=True) username = Column(Unicode(32), unique=True, nullable=False) @@ -118,28 +128,35 @@ class User(TimeStampedModel, UserMixin): user_name = synonym("username") def __init__(self, **kwargs): + """Init.""" pw_hash = generate_password_hash(kwargs.pop("password")) return super(User, self).__init__(pw_hash=pw_hash, **kwargs) - def __repr__(self): + def __repr__(self) -> str: + """Repr.""" return "<User: {!r}>".format(self.username) - def set_password(self, password): + def set_password(self, password: str) -> None: + """Set password.""" self.pw_hash = generate_password_hash(password) - def check_password(self, password): + def check_password(self, password: str) -> bool: + """Check password.""" return check_password_hash(self.pw_hash, password) - def get_id(self): + def get_id(self) -> int: + """Get ID.""" return self.id @validates("email") - def validate_email(self, key, value): + def validate_email(self, key, value: str) -> str: + """Validate email.""" assert "@" in value return value @validates("username") def validate_usernane(self, key, value): + """Validate username.""" assert re_username.match(value) return value @@ -150,21 +167,24 @@ class User(TimeStampedModel, UserMixin): return value @hybrid_property - def is_live(self): - return self.email_verified & ~self.disabled & ~self.deleted + def is_live(self) -> bool: + """User account is live.""" + return bool(self.email_verified & ~self.disabled & ~self.deleted) @classmethod - def lookup_user_or_email(cls, user_or_email): + def lookup_user_or_email(cls, user_or_email: str) -> User: + """Lookup user or email.""" field = cls.email if "@" in user_or_email else cls.username - return cls.query.filter(field == user_or_email).one_or_none() + return typing.cast(User, cls.query.filter(field == user_or_email).one_or_none()) @property - def mail_to_name(self): + def mail_to_name(self) -> str: """Name to use on e-mails sent to the user.""" return self.full_name or self.username @classmethod - def attempt_login(cls, user_or_email, password): + def attempt_login(cls, user_or_email: str, password: str) -> User: + """Attempt login.""" user = cls.lookup_user_or_email(user_or_email) if not user: raise LoginError("user not found") @@ -208,7 +228,7 @@ class Item(TimeStampedModel): primaryjoin=id == Reference.subject_id, secondaryjoin=id == Reference.object_id, ) - user = relationship("User", backref="items") + user = relationship("User", backref="items", lazy="select") __mapper_args__ = { "polymorphic_on": type, @@ -218,7 +238,7 @@ class Item(TimeStampedModel): @property def hashid(self) -> str: """Hashid for item.""" - return doc_hashids.encode(self.id) + return typing.cast(str, doc_hashids.encode(self.id)) @classmethod def get_by_hashid(cls, hashid: str) -> Item | None: @@ -229,7 +249,7 @@ class Item(TimeStampedModel): return None return typing.cast("Item", cls.query.get(item_id)) - def view_url(self, endpoint, **kwargs) -> str: + def view_url(self, endpoint: str, **kwargs) -> str: return url_for( "view." + endpoint, username=self.user.username, @@ -239,21 +259,27 @@ class Item(TimeStampedModel): @property def url(self) -> str: + """URL for to view this item.""" return self.view_url("view_item") - def url_fragment(self): + def url_fragment(self) -> str: + """URL fragment.""" return self.user.username + "/" + self.hashid - def version_url(self, version): + def version_url(self, version: int) -> str: + """URL for version.""" return self.view_url("view_item", v=version) @property - def history_url(self): + def history_url(self) -> str: + """History URL.""" return self.view_url("history") @property - def external_url(self): + def external_url(self) -> str: + """External URL.""" base_url = current_app.config.get("BASE_URL") + assert base_url and isinstance(base_url, str) if not base_url.endswith("/"): base_url += "/" if base_url: @@ -262,19 +288,23 @@ class Item(TimeStampedModel): return self.view_url("view_item", _external=True) @property - def edit_url(self): + def edit_url(self) -> str: + """Edit URL.""" return self.view_url("edit_item") @property - def set_title_url(self): + def set_title_url(self) -> str: + """Set title URL.""" return self.view_url("set_title") - def title_from_link(self, titles=None): - if not titles: + def title_from_link(self, titles: AllTitles | None = None) -> str | None: + """Get title from link.""" + if titles is None: titles = XanaLink.get_all_titles() return titles.get(self) - def title(self, titles=None): + def title(self, titles: AllTitles | None = None) -> str: + """Get title.""" return self.type + ": " + (self.title_from_link(titles) or self.hashid) def has_title(self) -> bool: @@ -282,7 +312,8 @@ class Item(TimeStampedModel): titles = XanaLink.get_all_titles() return self in titles - def set_title(self, title, user): + def set_title(self, title: str, user: User) -> None: + """Set item title.""" title_source_doc = SourceDoc(text=title, user=user) session.add(title_source_doc) session.commit() @@ -299,8 +330,20 @@ span: {},start=0,length={}""".format( session.commit() @classmethod - def from_external(cls, url: str, home: str | None = None) -> None | "Item": + def from_external(cls, url: str, home: str | None = None) -> None | Item: """Get item from URL.""" + + username: str | None + hashid: str | None + + parts = url.split("/") + username, hashid = parts[-2:] + item_id = doc_hashids.decode(hashid)[0] + q = cls.query.filter(User.username == username, cls.id == item_id) + item = q.one_or_none() + if item: + return typing.cast(Item, item) + base = current_app.config.get("BASE_URL") username, hashid = None, None if home is None: @@ -329,18 +372,23 @@ class XanaPage(Item): id = Column(Integer, ForeignKey(Item.id), primary_key=True) - def snippet(self): + def snippet(self) -> str: + """Snippet of text.""" return self.text @property - def xanaedit_url(self): + def xanaedit_url(self) -> str: + """XanaExit URL.""" return self.view_url("xanaedit_item") @property - def save_xanaedit_url(self): + def save_xanaedit_url(self) -> str: + """XanaExit save URL.""" return self.view_url("save_xanaedit") - def iter_spans(self): + def iter_spans(self) -> typing.Iterator[SpanTuple]: + """Span iterator.""" + assert self.text is not None for line in self.text.splitlines(): line = re_comment.sub("", line).strip() if not line: @@ -364,25 +412,32 @@ class XanaPage(Item): class XanaLink(Item): + """XanaLink.""" + __tablename__ = "xanalink" __mapper_args__ = {"polymorphic_identity": "xanalink"} id = Column(Integer, ForeignKey(Item.id), primary_key=True) - def parse(self): + def parse(self) -> ParsedLink: + """Parse link.""" + assert self.text is not None return parse_link(self.text) @property - def link_type(self): + def link_type(self) -> str: + """Get link type.""" return self.parse()["type"] - def title(self, titles=None): + def title(self, titles: AllTitles | None = None) -> str: + """Title of link.""" if titles is None: titles = XanaLink.get_all_titles() if self in titles: return self.type + ": " + titles[self] parsed = self.parse() + assert isinstance(parsed["type"], str) if parsed["type"] == "title": ident = parsed["facets"][0][0].partition(": ")[2] @@ -418,7 +473,8 @@ class XanaLink(Item): return (item, source_of_title.text[start : length + start]) @classmethod - def get_all_titles(cls, home: str | None = None) -> dict["Item", str]: + def get_all_titles(cls, home: str | None = None) -> AllTitles: + """Get all known titles.""" titles = {} for link in cls.query: ret = link.item_and_title(home) @@ -428,7 +484,8 @@ class XanaLink(Item): titles[item] = title return titles - def snippet(self): + def snippet(self) -> str | None: + """Snippet of text.""" return self.text @@ -442,13 +499,23 @@ class SourceDoc(Item): @property def document_price(self): + assert self.text is not None return self.db_document_price or self.db_price_per_character * len(self.text) @property def price_per_character(self): + assert self.text is not None return self.db_price_per_character or self.db_document_price / len(self.text) - def snippet(self, length=255, killwords=False, end="...", leeway=5): + def snippet( + self, + length: int = 255, + killwords: bool = False, + end: str = "...", + leeway: int = 5, + ) -> str: + """Get snippet of text.""" + assert self.text is not None s = self.text assert length >= len(end), "expected length >= %s, got %s" % (len(end), length) assert leeway >= 0, "expected leeway >= 0, got %s" % leeway @@ -459,26 +526,33 @@ class SourceDoc(Item): result = s[: length - len(end)].rsplit(" ", 1)[0] return result + end - def raw_title(self): + def raw_title(self) -> str: + """Raw title.""" return self.title(with_type=False) - def title(self, titles=None, with_type=True): + def title(self, titles: AllTitles | None = None, with_type: bool = True) -> str: + """Source document title.""" start = self.type + ": " if with_type else "" - titles = XanaLink.get_all_titles() + if titles is None: + titles = XanaLink.get_all_titles() from_link = self.title_from_link(titles=titles) if from_link: return start + from_link + assert self.text is not None first_line = first_non_empty_line(self.text) if first_line: return start + first_line return start + self.hashid @property - def create_xanapage_url(self): + def create_xanapage_url(self) -> str: + """Create xanapage URL.""" return self.view_url("create_xanapage_from_sourcedoc") @property - def entire_span(self): + def entire_span(self) -> str: + """Entire span.""" + assert self.text is not None return self.external_url + f",start=0,length={len(self.text)}" diff --git a/sourcing/parse.py b/sourcing/parse.py index eef5a9f..f4814b1 100644 --- a/sourcing/parse.py +++ b/sourcing/parse.py @@ -3,7 +3,7 @@ import re import typing from html import escape -from .url import ExternalText +from .types import ExternalText re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)") re_xanalink = re.compile(r"xanalink: +([^ ]+) *$") @@ -28,6 +28,8 @@ SpanTuple = tuple[str, int, int] SourceText = dict[str, ExternalText] +ParsedLink = dict[str, str | None | list[list[str]]] + def parse_span(line: str) -> None | SpanTuple: """Parse a span.""" @@ -118,7 +120,7 @@ def parse_xanapage_facet(facet: list[str]) -> str: return leg[len(prefix) :] -def parse_link(link_text: str) -> dict[str, str | None | list[list[str]]]: +def parse_link(link_text: str) -> ParsedLink: link_type = None expect = "link_type" facets = [] diff --git a/sourcing/templates/home.html b/sourcing/templates/home.html index 0fc61d8..27ef2f4 100644 --- a/sourcing/templates/home.html +++ b/sourcing/templates/home.html @@ -68,8 +68,12 @@ {% for doc in docs %} <div data-id="{{ doc.id }}" class="card border-primary my-2"> - <h5 class="card-header"><a href="{{ doc.url }}">{{ doc.title() }}</a> - — {{ doc.user.username }} — {{ doc.created | datetime }}</h5> + <h5 class="card-header"> + <a href="{{ doc.url }}">{{ doc.title(titles=titles) }}</a> + {# + — {{ doc.user.username }} — {{ doc.created | datetime }} + #} + </h5> <div class="card-body"> <p class="card-text"> {%- for line in doc.snippet().splitlines() -%} @@ -79,6 +83,7 @@ </div> </div> {% endfor %} + <div id="lower-buttons"> {{ new_buttons() }} </div> diff --git a/sourcing/text.py b/sourcing/text.py index 51fdaa3..a3bf9b0 100644 --- a/sourcing/text.py +++ b/sourcing/text.py @@ -1,12 +1,18 @@ -import re +"""Text utility functions.""" + import random +import re +import typing -re_newline = re.compile('\r?\n') +re_newline = re.compile("\r?\n") -def find_newlines(text): + +def find_newlines(text: str) -> typing.Iterator[int]: + """Find position of newlines in a piece of text.""" return (m.end(0) for m in re_newline.finditer(text)) -def iter_lines(text): + +def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]: start = 0 for m in re_newline.finditer(text): end = m.end(0) @@ -15,33 +21,42 @@ def iter_lines(text): if start < len(text) - 1: yield (start, text[start:]) -def censor_text(text): - def random_chr(): - return chr(random.randint(9728, 9983)) - return ''.join(random_chr() if c.isalnum() else c for c in text) -def add_highlight(text, span_start, span_length): +def censor_text(text: str) -> str: + """Censor text with a random character.""" + + def random_chr() -> str: + return chr(random.randint(9728, 9983)) + + return "".join(random_chr() if c.isalnum() else c for c in text) + + +def add_highlight(text: str, span_start: int, span_length: int): span_end = span_start + span_length for start, line in iter_lines(text): end = start + len(line) if not ((end > span_start) and (start < span_end)): - yield start, [{'text': line}] + yield start, [{"text": line}] continue within_line = span_start - start cur = [] if within_line > 0: before = line[:within_line] - cur.append({'text': before}) - cur.append({'highlight': line[max(within_line, 0):within_line + span_length]}) - after = line[within_line + span_length:] + cur.append({"text": before}) + cur.append({"highlight": line[max(within_line, 0) : within_line + span_length]}) + after = line[within_line + span_length :] if after: - cur.append({'text': after}) + cur.append({"text": after}) yield start, cur -def first_non_empty_line(text): + +def first_non_empty_line(text: str) -> str | None: + """First non-empty line.""" for start, cur in iter_lines(text): tidy = cur.strip() if tidy: return tidy + + return None diff --git a/sourcing/url.py b/sourcing/url.py index 4a1f693..a76a65b 100644 --- a/sourcing/url.py +++ b/sourcing/url.py @@ -5,6 +5,7 @@ import typing import requests from .model import Item +from .types import ExternalText project_dir = os.path.dirname(os.path.dirname(__file__)) cache_location = os.path.join(project_dir, "cache") @@ -17,15 +18,6 @@ def url_filename(url: str) -> str: return re_colon_slash.sub("_", url) -class ExternalText(typing.TypedDict): - """Text from external URL.""" - - url: str - text: str - heading: str - length: int - - def get_text(url: str) -> ExternalText: """Get text from URL and return as dict.""" # assume UTF-8 diff --git a/sourcing/view.py b/sourcing/view.py index 2b63681..96f60f1 100644 --- a/sourcing/view.py +++ b/sourcing/view.py @@ -108,10 +108,13 @@ def home() -> str: link_types = {item["link_type"] for item in docs_info if item.get("link_type")} + titles = XanaLink.get_all_titles() + return flask.render_template( "home.html", docs=docs, link_types=link_types, + titles=titles, years=years, docs_info=docs_info, nbsp_at_start=nbsp_at_start, @@ -120,7 +123,8 @@ def home() -> str: @bp.route("/password_reset", methods=["GET", "POST"]) -def password_reset(): +def password_reset() -> str | Response: + """Password reset.""" site_name = "perma.pub" # FIXME: move to config form = ForgotPasswordForm() if not form.validate_on_submit(): @@ -142,12 +146,13 @@ def password_reset(): @bp.route("/password_reset/sent", methods=["GET", "POST"]) -def password_reset_sent(): +def password_reset_sent() -> str: + """Password reset form.""" return flask.render_template("auth/password_reset_sent.html") @bp.route("/reset/<token>", methods=["GET", "POST"]) -def reset_with_token(token): +def reset_with_token(token: str) -> str | Response: ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) try: user_id = ts.loads(token, salt="password-reset", max_age=86400) @@ -173,7 +178,8 @@ def password_reset_complete() -> str: @bp.route("/source_doc_upload", methods=["POST"]) @show_errors -def source_doc_upload(): +def source_doc_upload() -> Response: + """Source doc upload.""" f = flask.request.files["sourcedoc_file"] text = f.read() doc = SourceDoc(text=text, user=current_user, filename=f.filename) @@ -220,7 +226,7 @@ def logout() -> Response: @bp.route("/signup", methods=["GET", "POST"]) -def signup(): +def signup() -> str | Response: if not flask.current_app.config.get("ALLOW_SIGNUP"): flask.abort(404) form = SignupForm() @@ -277,9 +283,10 @@ def view_edl(username: str, hashid: str) -> str: @bp.route("/<username>/<hashid>/realize") -def realize_edl(username, hashid): +def realize_edl(username: str, hashid: str) -> str: item = get_xanapage(username, hashid) + assert item.text is not None spans = list(fulfil_edl(item.text)) doc_text = "".join(span["text"] for span in spans) @@ -293,7 +300,7 @@ def realize_edl(username, hashid): @bp.route("/<username>/<hashid>/raw") -def view_item_raw(username: str, hashid: str): +def view_item_raw(username: str, hashid: str) -> str | Response: return view_item(username, hashid, raw=True) @@ -324,19 +331,29 @@ def fulfil_xanaflight(item: XanaLink) -> str: @bp.route("/<username>/<hashid>/fulfil") -def fulfil(username, hashid): +def fulfil(username: str, hashid: str) -> str: + """Fufil XanaPage or XanaLink.""" item = get_item(username, hashid) if item.type == "xanapage": + assert item.text is not None + return flask.render_template( "view/xanapage.html", item=item, doc=fulfil_edl_with_sources(item.text) ) - if item.type == "xanalink" and item.text.startswith("type=flight"): + if ( + item.type == "xanalink" + and item.text is not None + and item.text.startswith("type=flight") + ): + assert isinstance(item, XanaLink) return fulfil_xanaflight(item) + assert False + @bp.route("/<username>/<hashid>/set_title", methods=["POST"]) -def set_title(username, hashid): +def set_title(username: str, hashid: str) -> Response: item = get_item(username, hashid) has_title = item.has_title() item.set_title(flask.request.form["title"], current_user) @@ -345,7 +362,7 @@ def set_title(username, hashid): @bp.route("/<username>/<hashid>/delete", methods=["POST"]) -def delete_item(username, hashid): +def delete_item(username: str, hashid: str) -> Response: item = get_item(username, hashid) session.delete(item) session.commit() @@ -353,7 +370,8 @@ def delete_item(username, hashid): return redirect_to_home() -def save_new_xanalink(doc1, doc2): +def save_new_xanalink(doc1: XanaPage, doc2: XanaPage) -> None: + """Save XanaLink.""" start1 = flask.request.form["left_start"] length1 = flask.request.form["left_length"] start2 = flask.request.form["right_start"] @@ -385,7 +403,8 @@ def save_new_xanalink(doc1, doc2): @bp.route("/build_links", methods=["GET", "POST"]) -def build_links(): +def build_links() -> str | Response: + """Build links.""" doc1, doc2 = None, None hashid1, hashid2 = None, None if "doc1" in flask.request.args: @@ -396,12 +415,17 @@ def build_links(): doc2 = Item.get_by_hashid(hashid2) if flask.request.method == "POST": + assert ( + doc1 and doc2 and isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage) + ) save_new_xanalink(doc1, doc2) + assert flask.request.endpoint return flask.redirect( flask.url_for(flask.request.endpoint, doc1=hashid1, doc2=hashid2) ) if doc1 and doc2: + assert isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage) links = list({i for i in doc1.subjects} & {i for i in doc2.subjects}) else: links = [] @@ -479,7 +503,7 @@ def history(username: str, hashid: str) -> str: @bp.route("/<username>/<hashid>/as_xanapage", methods=["POST"]) -def create_xanapage_from_sourcedoc(username, hashid): +def create_xanapage_from_sourcedoc(username: str, hashid: str) -> Response: src_doc = get_source_doc(username, hashid) edl = "span: " + src_doc.entire_span + "\n" @@ -493,7 +517,7 @@ def create_xanapage_from_sourcedoc(username, hashid): @bp.route("/<username>/<hashid>/xanaedit", methods=["GET", "POST"]) -def xanaedit_item(username, hashid): +def xanaedit_item(username: str, hashid: str) -> str | Response: if flask.request.method == "POST": save_xanaedit(username, hashid) @@ -502,6 +526,7 @@ def xanaedit_item(username, hashid): ) doc = get_xanapage(username, hashid) + assert doc.text is not None spans = list(fulfil_edl(doc.text)) doc_text = "".join(span["text"] for span in spans) @@ -554,7 +579,7 @@ def finish_xanaedit(username: str, hashid: str) -> Response: @bp.route("/<username>/<hashid>/edit", methods=["GET", "POST"]) -def edit_item(username, hashid): +def edit_item(username: str, hashid: str) -> str | Response: obj = get_item(username, hashid) form = SourceDocForm(obj=obj) if form.validate_on_submit(): @@ -568,7 +593,7 @@ def edit_item(username, hashid): @bp.route("/source_doc_text/<int:source_doc_id>") -def source_doc_text(source_doc_id): +def source_doc_text(source_doc_id: int) -> str: doc = SourceDoc.query.get(source_doc_id) return flask.render_template("source_doc_text.html", doc=doc, iter_lines=iter_lines) @@ -605,7 +630,7 @@ def new_sourcedoc() -> str | Response: @bp.route("/new/xanalink/raw", methods=["GET", "POST"]) @login_required -def new_xanalink_raw(): +def new_xanalink_raw() -> str | Response: form = ItemForm() if form.validate_on_submit(): obj = XanaLink(user=current_user) @@ -625,8 +650,9 @@ def new_xanalink() -> str | Response: return flask.render_template("new_xanalink.html") data = flask.request.get_json() - lines = ["type=" + data["link_type"]] - for facet in data["facets"]: + assert isinstance(data, dict) + lines = ["type=" + typing.cast(str, data["link_type"])] + for facet in typing.cast(list[list[str]], data["facets"]): lines += ["facet="] + facet text = "".join(line + "\n" for line in lines) obj = XanaLink(user=current_user, text=text) @@ -668,6 +694,7 @@ def edit_source_document(filename: str) -> str | Response: @bp.route("/api/1/get/<username>/<filename>") def api_get_document(username: str, filename: str) -> Response: + """API to get a document, returns JSON.""" doc = get_source_doc(username, filename) assert doc.text is not None ret = {