Improvements

This commit is contained in:
Edward Betts 2023-08-29 11:00:24 -04:00
parent 51b09edbf5
commit fcc3afa4dc
9 changed files with 306 additions and 138 deletions

View file

@ -15,7 +15,7 @@ def init_db(db_url: str) -> None:
def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine:
"""Create an engine object.""" """Create an engine object."""
return create_engine(db_url, pool_recycle=3600) return create_engine(db_url, echo=False, pool_recycle=3600)
def init_app(app: flask.app.Flask) -> None: def init_app(app: flask.app.Flask) -> None:

View file

@ -81,7 +81,7 @@ def fulfil_edl_with_sources(
def fulfil_edl_with_links( def fulfil_edl_with_links(
edl: EDLDict, edl: EDLDict,
doc_num: str = "", doc_num: int | str = "",
links: list[dict[str, str]] | None = None, links: list[dict[str, str]] | None = None,
hide_all_transclusions: bool = False, hide_all_transclusions: bool = False,
): ):

View file

@ -1,82 +1,135 @@
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields import StringField, PasswordField, BooleanField, HiddenField, TextAreaField, FileField, IntegerField from wtforms.fields import (
from wtforms.validators import InputRequired, Email, Length, ValidationError, Regexp, NoneOf, Optional BooleanField,
from .model import User, LoginError, re_username, reserved_name, user_exists FileField,
HiddenField,
IntegerField,
PasswordField,
StringField,
TextAreaField,
)
from wtforms.validators import (
Email,
InputRequired,
Length,
NoneOf,
Optional,
Regexp,
ValidationError,
)
from .model import LoginError, User, re_username, reserved_name, user_exists
PASSWORD_LEN = 64 PASSWORD_LEN = 64
EMAIL_LEN = 64 EMAIL_LEN = 64
class SignupForm(FlaskForm): class SignupForm(FlaskForm):
username = StringField('username', """Signup form."""
[InputRequired(),
Regexp(re_username), username = StringField(
NoneOf(reserved_name, message='Not available.'), "username",
Length(min=3, max=64)], [
[lambda name: name and name.replace(' ', '_')]) InputRequired(),
email = StringField('e-mail address', Regexp(re_username),
[InputRequired(), Email(), NoneOf(reserved_name, message="Not available."),
Length(min=5, max=EMAIL_LEN)], Length(min=3, max=64),
description="we never share your e-mail address") ],
password = StringField('password', [lambda name: name and name.replace(" ", "_")],
[InputRequired(), Length(min=4, max=PASSWORD_LEN)]) )
email = StringField(
"e-mail address",
[InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)],
description="we never share your e-mail address",
)
password = StringField(
"password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)]
)
def validate_username(form, field): def validate_username(form, field):
if user_exists(User.username, field.data): if user_exists(User.username, field.data):
raise ValidationError('Not available') raise ValidationError("Not available")
def validate_email(form, field): def validate_email(form, field):
if user_exists(User.email, field.data): if user_exists(User.email, field.data):
raise ValidationError('In use by another account') raise ValidationError("In use by another account")
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
user_or_email = StringField('username or e-mail address', """Login form."""
[InputRequired(), Length(min=3, max=EMAIL_LEN)],
[lambda name: name and name.replace(' ', '_')])
password = PasswordField('password',
[InputRequired(), Length(max=PASSWORD_LEN)])
remember = BooleanField('stay logged in')
next = HiddenField('next')
def validate(self): user_or_email = StringField(
"username or e-mail address",
[InputRequired(), Length(min=3, max=EMAIL_LEN)],
[lambda name: name and name.replace(" ", "_")],
)
password = PasswordField("password", [InputRequired(), Length(max=PASSWORD_LEN)])
remember = BooleanField("stay logged in")
next = HiddenField("next")
def validate(self) -> bool:
"""Validate."""
rv = FlaskForm.validate(self) rv = FlaskForm.validate(self)
if not rv: if not rv:
return False return False
try: try:
self.user = User.attempt_login(self.user_or_email.data, self.user = User.attempt_login(self.user_or_email.data, self.password.data)
self.password.data)
return True return True
except LoginError as e: except LoginError as e:
self.user_or_email.errors.append(e.msg) self.user_or_email.errors.append(e.msg)
return False return False
class ForgotPasswordForm(FlaskForm): class ForgotPasswordForm(FlaskForm):
user_or_email = StringField('username or e-mail address', """Forgot password form."""
[InputRequired(), Length(max=EMAIL_LEN)])
user_or_email = StringField(
"username or e-mail address", [InputRequired(), Length(max=EMAIL_LEN)]
)
class PasswordForm(FlaskForm): class PasswordForm(FlaskForm):
password = PasswordField('new password', """Password form."""
[InputRequired(), Length(min=4, max=PASSWORD_LEN)])
password = PasswordField(
"new password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)]
)
class AccountSettingsForm(FlaskForm): class AccountSettingsForm(FlaskForm):
full_name = StringField('full name', [Length(max=64)]) """Account settings form."""
email = StringField('e-mail address',
[InputRequired(), Email(), full_name = StringField("full name", [Length(max=64)])
Length(min=5, max=EMAIL_LEN)]) email = StringField(
"e-mail address", [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)]
)
class ChangePasswordForm(FlaskForm): class ChangePasswordForm(FlaskForm):
old_password = PasswordField('current password', """Change password form."""
[InputRequired(), Length(max=PASSWORD_LEN)])
new_password = PasswordField('new password', old_password = PasswordField(
[InputRequired(), Length(max=PASSWORD_LEN)]) "current password", [InputRequired(), Length(max=PASSWORD_LEN)]
)
new_password = PasswordField(
"new password", [InputRequired(), Length(max=PASSWORD_LEN)]
)
class SourceDocForm(FlaskForm): class SourceDocForm(FlaskForm):
text = TextAreaField('text', [InputRequired()]) """Source doc form."""
db_price_per_character = IntegerField('price per character', [Optional()])
db_document_price = IntegerField('document price', [Optional()]) text = TextAreaField("text", [InputRequired()])
db_price_per_character = IntegerField("price per character", [Optional()])
db_document_price = IntegerField("document price", [Optional()])
class ItemForm(FlaskForm): class ItemForm(FlaskForm):
text = TextAreaField('text', [InputRequired()]) """Item form."""
text = TextAreaField("text", [InputRequired()])
class UploadSourceDocForm(FlaskForm): class UploadSourceDocForm(FlaskForm):
sourcedoc_file = FileField('SourceDoc', [Regexp(r'^[^/\\]+\.txt$')]) sourcedoc_file = FileField("SourceDoc", [Regexp(r"^[^/\\]+\.txt$")])

View file

@ -27,7 +27,7 @@ from sqlalchemy_continuum.plugins import ActivityPlugin, FlaskPlugin
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
from .database import session from .database import session
from .parse import parse_link, parse_span from .parse import ParsedLink, SpanTuple, parse_link, parse_span
from .text import first_non_empty_line from .text import first_non_empty_line
activity_plugin = ActivityPlugin() activity_plugin = ActivityPlugin()
@ -79,27 +79,37 @@ re_username = re.compile(r"^\w+$", re.U)
re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U) re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U)
re_comment = re.compile(r"#.*") re_comment = re.compile(r"#.*")
AllTitles = dict["Item", str]
def item_url(): def item_url():
return url_for("view.view_item", username=self.user.username, hashid=self.hashid) return url_for("view.view_item", username=self.user.username, hashid=self.hashid)
def user_exists(field, value): def user_exists(field: Column[str], value: str) -> bool:
return session.query(exists().where(field == value)).scalar() """Check that user exists."""
return typing.cast(bool, session.query(exists().where(field == value)).scalar())
class TimeStampedModel(Base): class TimeStampedModel(Base):
"""Time stamped model."""
__abstract__ = True __abstract__ = True
created = Column(DateTime, default=func.now()) created = Column(DateTime, default=func.now())
modified = Column(DateTime, default=func.now(), onupdate=func.now()) modified = Column(DateTime, default=func.now(), onupdate=func.now())
class LoginError(Exception): class LoginError(Exception):
def __init__(self, msg): """Login error."""
def __init__(self, msg: str):
"""Init."""
self.msg = msg self.msg = msg
class User(TimeStampedModel, UserMixin): class User(TimeStampedModel, UserMixin):
"""User model."""
__tablename__ = "user" __tablename__ = "user"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
username = Column(Unicode(32), unique=True, nullable=False) username = Column(Unicode(32), unique=True, nullable=False)
@ -118,28 +128,35 @@ class User(TimeStampedModel, UserMixin):
user_name = synonym("username") user_name = synonym("username")
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""Init."""
pw_hash = generate_password_hash(kwargs.pop("password")) pw_hash = generate_password_hash(kwargs.pop("password"))
return super(User, self).__init__(pw_hash=pw_hash, **kwargs) return super(User, self).__init__(pw_hash=pw_hash, **kwargs)
def __repr__(self): def __repr__(self) -> str:
"""Repr."""
return "<User: {!r}>".format(self.username) return "<User: {!r}>".format(self.username)
def set_password(self, password): def set_password(self, password: str) -> None:
"""Set password."""
self.pw_hash = generate_password_hash(password) self.pw_hash = generate_password_hash(password)
def check_password(self, password): def check_password(self, password: str) -> bool:
"""Check password."""
return check_password_hash(self.pw_hash, password) return check_password_hash(self.pw_hash, password)
def get_id(self): def get_id(self) -> int:
"""Get ID."""
return self.id return self.id
@validates("email") @validates("email")
def validate_email(self, key, value): def validate_email(self, key, value: str) -> str:
"""Validate email."""
assert "@" in value assert "@" in value
return value return value
@validates("username") @validates("username")
def validate_usernane(self, key, value): def validate_usernane(self, key, value):
"""Validate username."""
assert re_username.match(value) assert re_username.match(value)
return value return value
@ -150,21 +167,24 @@ class User(TimeStampedModel, UserMixin):
return value return value
@hybrid_property @hybrid_property
def is_live(self): def is_live(self) -> bool:
return self.email_verified & ~self.disabled & ~self.deleted """User account is live."""
return bool(self.email_verified & ~self.disabled & ~self.deleted)
@classmethod @classmethod
def lookup_user_or_email(cls, user_or_email): def lookup_user_or_email(cls, user_or_email: str) -> User:
"""Lookup user or email."""
field = cls.email if "@" in user_or_email else cls.username field = cls.email if "@" in user_or_email else cls.username
return cls.query.filter(field == user_or_email).one_or_none() return typing.cast(User, cls.query.filter(field == user_or_email).one_or_none())
@property @property
def mail_to_name(self): def mail_to_name(self) -> str:
"""Name to use on e-mails sent to the user.""" """Name to use on e-mails sent to the user."""
return self.full_name or self.username return self.full_name or self.username
@classmethod @classmethod
def attempt_login(cls, user_or_email, password): def attempt_login(cls, user_or_email: str, password: str) -> User:
"""Attempt login."""
user = cls.lookup_user_or_email(user_or_email) user = cls.lookup_user_or_email(user_or_email)
if not user: if not user:
raise LoginError("user not found") raise LoginError("user not found")
@ -208,7 +228,7 @@ class Item(TimeStampedModel):
primaryjoin=id == Reference.subject_id, primaryjoin=id == Reference.subject_id,
secondaryjoin=id == Reference.object_id, secondaryjoin=id == Reference.object_id,
) )
user = relationship("User", backref="items") user = relationship("User", backref="items", lazy="select")
__mapper_args__ = { __mapper_args__ = {
"polymorphic_on": type, "polymorphic_on": type,
@ -218,7 +238,7 @@ class Item(TimeStampedModel):
@property @property
def hashid(self) -> str: def hashid(self) -> str:
"""Hashid for item.""" """Hashid for item."""
return doc_hashids.encode(self.id) return typing.cast(str, doc_hashids.encode(self.id))
@classmethod @classmethod
def get_by_hashid(cls, hashid: str) -> Item | None: def get_by_hashid(cls, hashid: str) -> Item | None:
@ -229,7 +249,7 @@ class Item(TimeStampedModel):
return None return None
return typing.cast("Item", cls.query.get(item_id)) return typing.cast("Item", cls.query.get(item_id))
def view_url(self, endpoint, **kwargs) -> str: def view_url(self, endpoint: str, **kwargs) -> str:
return url_for( return url_for(
"view." + endpoint, "view." + endpoint,
username=self.user.username, username=self.user.username,
@ -239,21 +259,27 @@ class Item(TimeStampedModel):
@property @property
def url(self) -> str: def url(self) -> str:
"""URL for to view this item."""
return self.view_url("view_item") return self.view_url("view_item")
def url_fragment(self): def url_fragment(self) -> str:
"""URL fragment."""
return self.user.username + "/" + self.hashid return self.user.username + "/" + self.hashid
def version_url(self, version): def version_url(self, version: int) -> str:
"""URL for version."""
return self.view_url("view_item", v=version) return self.view_url("view_item", v=version)
@property @property
def history_url(self): def history_url(self) -> str:
"""History URL."""
return self.view_url("history") return self.view_url("history")
@property @property
def external_url(self): def external_url(self) -> str:
"""External URL."""
base_url = current_app.config.get("BASE_URL") base_url = current_app.config.get("BASE_URL")
assert base_url and isinstance(base_url, str)
if not base_url.endswith("/"): if not base_url.endswith("/"):
base_url += "/" base_url += "/"
if base_url: if base_url:
@ -262,19 +288,23 @@ class Item(TimeStampedModel):
return self.view_url("view_item", _external=True) return self.view_url("view_item", _external=True)
@property @property
def edit_url(self): def edit_url(self) -> str:
"""Edit URL."""
return self.view_url("edit_item") return self.view_url("edit_item")
@property @property
def set_title_url(self): def set_title_url(self) -> str:
"""Set title URL."""
return self.view_url("set_title") return self.view_url("set_title")
def title_from_link(self, titles=None): def title_from_link(self, titles: AllTitles | None = None) -> str | None:
if not titles: """Get title from link."""
if titles is None:
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
return titles.get(self) return titles.get(self)
def title(self, titles=None): def title(self, titles: AllTitles | None = None) -> str:
"""Get title."""
return self.type + ": " + (self.title_from_link(titles) or self.hashid) return self.type + ": " + (self.title_from_link(titles) or self.hashid)
def has_title(self) -> bool: def has_title(self) -> bool:
@ -282,7 +312,8 @@ class Item(TimeStampedModel):
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
return self in titles return self in titles
def set_title(self, title, user): def set_title(self, title: str, user: User) -> None:
"""Set item title."""
title_source_doc = SourceDoc(text=title, user=user) title_source_doc = SourceDoc(text=title, user=user)
session.add(title_source_doc) session.add(title_source_doc)
session.commit() session.commit()
@ -299,8 +330,20 @@ span: {},start=0,length={}""".format(
session.commit() session.commit()
@classmethod @classmethod
def from_external(cls, url: str, home: str | None = None) -> None | "Item": def from_external(cls, url: str, home: str | None = None) -> None | Item:
"""Get item from URL.""" """Get item from URL."""
username: str | None
hashid: str | None
parts = url.split("/")
username, hashid = parts[-2:]
item_id = doc_hashids.decode(hashid)[0]
q = cls.query.filter(User.username == username, cls.id == item_id)
item = q.one_or_none()
if item:
return typing.cast(Item, item)
base = current_app.config.get("BASE_URL") base = current_app.config.get("BASE_URL")
username, hashid = None, None username, hashid = None, None
if home is None: if home is None:
@ -329,18 +372,23 @@ class XanaPage(Item):
id = Column(Integer, ForeignKey(Item.id), primary_key=True) id = Column(Integer, ForeignKey(Item.id), primary_key=True)
def snippet(self): def snippet(self) -> str:
"""Snippet of text."""
return self.text return self.text
@property @property
def xanaedit_url(self): def xanaedit_url(self) -> str:
"""XanaExit URL."""
return self.view_url("xanaedit_item") return self.view_url("xanaedit_item")
@property @property
def save_xanaedit_url(self): def save_xanaedit_url(self) -> str:
"""XanaExit save URL."""
return self.view_url("save_xanaedit") return self.view_url("save_xanaedit")
def iter_spans(self): def iter_spans(self) -> typing.Iterator[SpanTuple]:
"""Span iterator."""
assert self.text is not None
for line in self.text.splitlines(): for line in self.text.splitlines():
line = re_comment.sub("", line).strip() line = re_comment.sub("", line).strip()
if not line: if not line:
@ -364,25 +412,32 @@ class XanaPage(Item):
class XanaLink(Item): class XanaLink(Item):
"""XanaLink."""
__tablename__ = "xanalink" __tablename__ = "xanalink"
__mapper_args__ = {"polymorphic_identity": "xanalink"} __mapper_args__ = {"polymorphic_identity": "xanalink"}
id = Column(Integer, ForeignKey(Item.id), primary_key=True) id = Column(Integer, ForeignKey(Item.id), primary_key=True)
def parse(self): def parse(self) -> ParsedLink:
"""Parse link."""
assert self.text is not None
return parse_link(self.text) return parse_link(self.text)
@property @property
def link_type(self): def link_type(self) -> str:
"""Get link type."""
return self.parse()["type"] return self.parse()["type"]
def title(self, titles=None): def title(self, titles: AllTitles | None = None) -> str:
"""Title of link."""
if titles is None: if titles is None:
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
if self in titles: if self in titles:
return self.type + ": " + titles[self] return self.type + ": " + titles[self]
parsed = self.parse() parsed = self.parse()
assert isinstance(parsed["type"], str)
if parsed["type"] == "title": if parsed["type"] == "title":
ident = parsed["facets"][0][0].partition(": ")[2] ident = parsed["facets"][0][0].partition(": ")[2]
@ -418,7 +473,8 @@ class XanaLink(Item):
return (item, source_of_title.text[start : length + start]) return (item, source_of_title.text[start : length + start])
@classmethod @classmethod
def get_all_titles(cls, home: str | None = None) -> dict["Item", str]: def get_all_titles(cls, home: str | None = None) -> AllTitles:
"""Get all known titles."""
titles = {} titles = {}
for link in cls.query: for link in cls.query:
ret = link.item_and_title(home) ret = link.item_and_title(home)
@ -428,7 +484,8 @@ class XanaLink(Item):
titles[item] = title titles[item] = title
return titles return titles
def snippet(self): def snippet(self) -> str | None:
"""Snippet of text."""
return self.text return self.text
@ -442,13 +499,23 @@ class SourceDoc(Item):
@property @property
def document_price(self): def document_price(self):
assert self.text is not None
return self.db_document_price or self.db_price_per_character * len(self.text) return self.db_document_price or self.db_price_per_character * len(self.text)
@property @property
def price_per_character(self): def price_per_character(self):
assert self.text is not None
return self.db_price_per_character or self.db_document_price / len(self.text) return self.db_price_per_character or self.db_document_price / len(self.text)
def snippet(self, length=255, killwords=False, end="...", leeway=5): def snippet(
self,
length: int = 255,
killwords: bool = False,
end: str = "...",
leeway: int = 5,
) -> str:
"""Get snippet of text."""
assert self.text is not None
s = self.text s = self.text
assert length >= len(end), "expected length >= %s, got %s" % (len(end), length) assert length >= len(end), "expected length >= %s, got %s" % (len(end), length)
assert leeway >= 0, "expected leeway >= 0, got %s" % leeway assert leeway >= 0, "expected leeway >= 0, got %s" % leeway
@ -459,26 +526,33 @@ class SourceDoc(Item):
result = s[: length - len(end)].rsplit(" ", 1)[0] result = s[: length - len(end)].rsplit(" ", 1)[0]
return result + end return result + end
def raw_title(self): def raw_title(self) -> str:
"""Raw title."""
return self.title(with_type=False) return self.title(with_type=False)
def title(self, titles=None, with_type=True): def title(self, titles: AllTitles | None = None, with_type: bool = True) -> str:
"""Source document title."""
start = self.type + ": " if with_type else "" start = self.type + ": " if with_type else ""
titles = XanaLink.get_all_titles() if titles is None:
titles = XanaLink.get_all_titles()
from_link = self.title_from_link(titles=titles) from_link = self.title_from_link(titles=titles)
if from_link: if from_link:
return start + from_link return start + from_link
assert self.text is not None
first_line = first_non_empty_line(self.text) first_line = first_non_empty_line(self.text)
if first_line: if first_line:
return start + first_line return start + first_line
return start + self.hashid return start + self.hashid
@property @property
def create_xanapage_url(self): def create_xanapage_url(self) -> str:
"""Create xanapage URL."""
return self.view_url("create_xanapage_from_sourcedoc") return self.view_url("create_xanapage_from_sourcedoc")
@property @property
def entire_span(self): def entire_span(self) -> str:
"""Entire span."""
assert self.text is not None
return self.external_url + f",start=0,length={len(self.text)}" return self.external_url + f",start=0,length={len(self.text)}"

View file

@ -3,7 +3,7 @@ import re
import typing import typing
from html import escape from html import escape
from .url import ExternalText from .types import ExternalText
re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)") re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)")
re_xanalink = re.compile(r"xanalink: +([^ ]+) *$") re_xanalink = re.compile(r"xanalink: +([^ ]+) *$")
@ -28,6 +28,8 @@ SpanTuple = tuple[str, int, int]
SourceText = dict[str, ExternalText] SourceText = dict[str, ExternalText]
ParsedLink = dict[str, str | None | list[list[str]]]
def parse_span(line: str) -> None | SpanTuple: def parse_span(line: str) -> None | SpanTuple:
"""Parse a span.""" """Parse a span."""
@ -118,7 +120,7 @@ def parse_xanapage_facet(facet: list[str]) -> str:
return leg[len(prefix) :] return leg[len(prefix) :]
def parse_link(link_text: str) -> dict[str, str | None | list[list[str]]]: def parse_link(link_text: str) -> ParsedLink:
link_type = None link_type = None
expect = "link_type" expect = "link_type"
facets = [] facets = []

View file

@ -68,8 +68,12 @@
{% for doc in docs %} {% for doc in docs %}
<div data-id="{{ doc.id }}" class="card border-primary my-2"> <div data-id="{{ doc.id }}" class="card border-primary my-2">
<h5 class="card-header"><a href="{{ doc.url }}">{{ doc.title() }}</a> <h5 class="card-header">
&mdash; {{ doc.user.username }} &mdash; {{ doc.created | datetime }}</h5> <a href="{{ doc.url }}">{{ doc.title(titles=titles) }}</a>
{#
&mdash; {{ doc.user.username }} &mdash; {{ doc.created | datetime }}
#}
</h5>
<div class="card-body"> <div class="card-body">
<p class="card-text"> <p class="card-text">
{%- for line in doc.snippet().splitlines() -%} {%- for line in doc.snippet().splitlines() -%}
@ -79,6 +83,7 @@
</div> </div>
</div> </div>
{% endfor %} {% endfor %}
<div id="lower-buttons"> <div id="lower-buttons">
{{ new_buttons() }} {{ new_buttons() }}
</div> </div>

View file

@ -1,12 +1,18 @@
import re """Text utility functions."""
import random import random
import re
import typing
re_newline = re.compile('\r?\n') re_newline = re.compile("\r?\n")
def find_newlines(text):
def find_newlines(text: str) -> typing.Iterator[int]:
"""Find position of newlines in a piece of text."""
return (m.end(0) for m in re_newline.finditer(text)) return (m.end(0) for m in re_newline.finditer(text))
def iter_lines(text):
def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]:
start = 0 start = 0
for m in re_newline.finditer(text): for m in re_newline.finditer(text):
end = m.end(0) end = m.end(0)
@ -15,33 +21,42 @@ def iter_lines(text):
if start < len(text) - 1: if start < len(text) - 1:
yield (start, text[start:]) yield (start, text[start:])
def censor_text(text):
def random_chr():
return chr(random.randint(9728, 9983))
return ''.join(random_chr() if c.isalnum() else c for c in text)
def add_highlight(text, span_start, span_length): def censor_text(text: str) -> str:
"""Censor text with a random character."""
def random_chr() -> str:
return chr(random.randint(9728, 9983))
return "".join(random_chr() if c.isalnum() else c for c in text)
def add_highlight(text: str, span_start: int, span_length: int):
span_end = span_start + span_length span_end = span_start + span_length
for start, line in iter_lines(text): for start, line in iter_lines(text):
end = start + len(line) end = start + len(line)
if not ((end > span_start) and (start < span_end)): if not ((end > span_start) and (start < span_end)):
yield start, [{'text': line}] yield start, [{"text": line}]
continue continue
within_line = span_start - start within_line = span_start - start
cur = [] cur = []
if within_line > 0: if within_line > 0:
before = line[:within_line] before = line[:within_line]
cur.append({'text': before}) cur.append({"text": before})
cur.append({'highlight': line[max(within_line, 0):within_line + span_length]}) cur.append({"highlight": line[max(within_line, 0) : within_line + span_length]})
after = line[within_line + span_length:] after = line[within_line + span_length :]
if after: if after:
cur.append({'text': after}) cur.append({"text": after})
yield start, cur yield start, cur
def first_non_empty_line(text):
def first_non_empty_line(text: str) -> str | None:
"""First non-empty line."""
for start, cur in iter_lines(text): for start, cur in iter_lines(text):
tidy = cur.strip() tidy = cur.strip()
if tidy: if tidy:
return tidy return tidy
return None

View file

@ -5,6 +5,7 @@ import typing
import requests import requests
from .model import Item from .model import Item
from .types import ExternalText
project_dir = os.path.dirname(os.path.dirname(__file__)) project_dir = os.path.dirname(os.path.dirname(__file__))
cache_location = os.path.join(project_dir, "cache") cache_location = os.path.join(project_dir, "cache")
@ -17,15 +18,6 @@ def url_filename(url: str) -> str:
return re_colon_slash.sub("_", url) return re_colon_slash.sub("_", url)
class ExternalText(typing.TypedDict):
"""Text from external URL."""
url: str
text: str
heading: str
length: int
def get_text(url: str) -> ExternalText: def get_text(url: str) -> ExternalText:
"""Get text from URL and return as dict.""" """Get text from URL and return as dict."""
# assume UTF-8 # assume UTF-8

View file

@ -108,10 +108,13 @@ def home() -> str:
link_types = {item["link_type"] for item in docs_info if item.get("link_type")} link_types = {item["link_type"] for item in docs_info if item.get("link_type")}
titles = XanaLink.get_all_titles()
return flask.render_template( return flask.render_template(
"home.html", "home.html",
docs=docs, docs=docs,
link_types=link_types, link_types=link_types,
titles=titles,
years=years, years=years,
docs_info=docs_info, docs_info=docs_info,
nbsp_at_start=nbsp_at_start, nbsp_at_start=nbsp_at_start,
@ -120,7 +123,8 @@ def home() -> str:
@bp.route("/password_reset", methods=["GET", "POST"]) @bp.route("/password_reset", methods=["GET", "POST"])
def password_reset(): def password_reset() -> str | Response:
"""Password reset."""
site_name = "perma.pub" # FIXME: move to config site_name = "perma.pub" # FIXME: move to config
form = ForgotPasswordForm() form = ForgotPasswordForm()
if not form.validate_on_submit(): if not form.validate_on_submit():
@ -142,12 +146,13 @@ def password_reset():
@bp.route("/password_reset/sent", methods=["GET", "POST"]) @bp.route("/password_reset/sent", methods=["GET", "POST"])
def password_reset_sent(): def password_reset_sent() -> str:
"""Password reset form."""
return flask.render_template("auth/password_reset_sent.html") return flask.render_template("auth/password_reset_sent.html")
@bp.route("/reset/<token>", methods=["GET", "POST"]) @bp.route("/reset/<token>", methods=["GET", "POST"])
def reset_with_token(token): def reset_with_token(token: str) -> str | Response:
ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"])
try: try:
user_id = ts.loads(token, salt="password-reset", max_age=86400) user_id = ts.loads(token, salt="password-reset", max_age=86400)
@ -173,7 +178,8 @@ def password_reset_complete() -> str:
@bp.route("/source_doc_upload", methods=["POST"]) @bp.route("/source_doc_upload", methods=["POST"])
@show_errors @show_errors
def source_doc_upload(): def source_doc_upload() -> Response:
"""Source doc upload."""
f = flask.request.files["sourcedoc_file"] f = flask.request.files["sourcedoc_file"]
text = f.read() text = f.read()
doc = SourceDoc(text=text, user=current_user, filename=f.filename) doc = SourceDoc(text=text, user=current_user, filename=f.filename)
@ -220,7 +226,7 @@ def logout() -> Response:
@bp.route("/signup", methods=["GET", "POST"]) @bp.route("/signup", methods=["GET", "POST"])
def signup(): def signup() -> str | Response:
if not flask.current_app.config.get("ALLOW_SIGNUP"): if not flask.current_app.config.get("ALLOW_SIGNUP"):
flask.abort(404) flask.abort(404)
form = SignupForm() form = SignupForm()
@ -277,9 +283,10 @@ def view_edl(username: str, hashid: str) -> str:
@bp.route("/<username>/<hashid>/realize") @bp.route("/<username>/<hashid>/realize")
def realize_edl(username, hashid): def realize_edl(username: str, hashid: str) -> str:
item = get_xanapage(username, hashid) item = get_xanapage(username, hashid)
assert item.text is not None
spans = list(fulfil_edl(item.text)) spans = list(fulfil_edl(item.text))
doc_text = "".join(span["text"] for span in spans) doc_text = "".join(span["text"] for span in spans)
@ -293,7 +300,7 @@ def realize_edl(username, hashid):
@bp.route("/<username>/<hashid>/raw") @bp.route("/<username>/<hashid>/raw")
def view_item_raw(username: str, hashid: str): def view_item_raw(username: str, hashid: str) -> str | Response:
return view_item(username, hashid, raw=True) return view_item(username, hashid, raw=True)
@ -324,19 +331,29 @@ def fulfil_xanaflight(item: XanaLink) -> str:
@bp.route("/<username>/<hashid>/fulfil") @bp.route("/<username>/<hashid>/fulfil")
def fulfil(username, hashid): def fulfil(username: str, hashid: str) -> str:
"""Fufil XanaPage or XanaLink."""
item = get_item(username, hashid) item = get_item(username, hashid)
if item.type == "xanapage": if item.type == "xanapage":
assert item.text is not None
return flask.render_template( return flask.render_template(
"view/xanapage.html", item=item, doc=fulfil_edl_with_sources(item.text) "view/xanapage.html", item=item, doc=fulfil_edl_with_sources(item.text)
) )
if item.type == "xanalink" and item.text.startswith("type=flight"): if (
item.type == "xanalink"
and item.text is not None
and item.text.startswith("type=flight")
):
assert isinstance(item, XanaLink)
return fulfil_xanaflight(item) return fulfil_xanaflight(item)
assert False
@bp.route("/<username>/<hashid>/set_title", methods=["POST"]) @bp.route("/<username>/<hashid>/set_title", methods=["POST"])
def set_title(username, hashid): def set_title(username: str, hashid: str) -> Response:
item = get_item(username, hashid) item = get_item(username, hashid)
has_title = item.has_title() has_title = item.has_title()
item.set_title(flask.request.form["title"], current_user) item.set_title(flask.request.form["title"], current_user)
@ -345,7 +362,7 @@ def set_title(username, hashid):
@bp.route("/<username>/<hashid>/delete", methods=["POST"]) @bp.route("/<username>/<hashid>/delete", methods=["POST"])
def delete_item(username, hashid): def delete_item(username: str, hashid: str) -> Response:
item = get_item(username, hashid) item = get_item(username, hashid)
session.delete(item) session.delete(item)
session.commit() session.commit()
@ -353,7 +370,8 @@ def delete_item(username, hashid):
return redirect_to_home() return redirect_to_home()
def save_new_xanalink(doc1, doc2): def save_new_xanalink(doc1: XanaPage, doc2: XanaPage) -> None:
"""Save XanaLink."""
start1 = flask.request.form["left_start"] start1 = flask.request.form["left_start"]
length1 = flask.request.form["left_length"] length1 = flask.request.form["left_length"]
start2 = flask.request.form["right_start"] start2 = flask.request.form["right_start"]
@ -385,7 +403,8 @@ def save_new_xanalink(doc1, doc2):
@bp.route("/build_links", methods=["GET", "POST"]) @bp.route("/build_links", methods=["GET", "POST"])
def build_links(): def build_links() -> str | Response:
"""Build links."""
doc1, doc2 = None, None doc1, doc2 = None, None
hashid1, hashid2 = None, None hashid1, hashid2 = None, None
if "doc1" in flask.request.args: if "doc1" in flask.request.args:
@ -396,12 +415,17 @@ def build_links():
doc2 = Item.get_by_hashid(hashid2) doc2 = Item.get_by_hashid(hashid2)
if flask.request.method == "POST": if flask.request.method == "POST":
assert (
doc1 and doc2 and isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage)
)
save_new_xanalink(doc1, doc2) save_new_xanalink(doc1, doc2)
assert flask.request.endpoint
return flask.redirect( return flask.redirect(
flask.url_for(flask.request.endpoint, doc1=hashid1, doc2=hashid2) flask.url_for(flask.request.endpoint, doc1=hashid1, doc2=hashid2)
) )
if doc1 and doc2: if doc1 and doc2:
assert isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage)
links = list({i for i in doc1.subjects} & {i for i in doc2.subjects}) links = list({i for i in doc1.subjects} & {i for i in doc2.subjects})
else: else:
links = [] links = []
@ -479,7 +503,7 @@ def history(username: str, hashid: str) -> str:
@bp.route("/<username>/<hashid>/as_xanapage", methods=["POST"]) @bp.route("/<username>/<hashid>/as_xanapage", methods=["POST"])
def create_xanapage_from_sourcedoc(username, hashid): def create_xanapage_from_sourcedoc(username: str, hashid: str) -> Response:
src_doc = get_source_doc(username, hashid) src_doc = get_source_doc(username, hashid)
edl = "span: " + src_doc.entire_span + "\n" edl = "span: " + src_doc.entire_span + "\n"
@ -493,7 +517,7 @@ def create_xanapage_from_sourcedoc(username, hashid):
@bp.route("/<username>/<hashid>/xanaedit", methods=["GET", "POST"]) @bp.route("/<username>/<hashid>/xanaedit", methods=["GET", "POST"])
def xanaedit_item(username, hashid): def xanaedit_item(username: str, hashid: str) -> str | Response:
if flask.request.method == "POST": if flask.request.method == "POST":
save_xanaedit(username, hashid) save_xanaedit(username, hashid)
@ -502,6 +526,7 @@ def xanaedit_item(username, hashid):
) )
doc = get_xanapage(username, hashid) doc = get_xanapage(username, hashid)
assert doc.text is not None
spans = list(fulfil_edl(doc.text)) spans = list(fulfil_edl(doc.text))
doc_text = "".join(span["text"] for span in spans) doc_text = "".join(span["text"] for span in spans)
@ -554,7 +579,7 @@ def finish_xanaedit(username: str, hashid: str) -> Response:
@bp.route("/<username>/<hashid>/edit", methods=["GET", "POST"]) @bp.route("/<username>/<hashid>/edit", methods=["GET", "POST"])
def edit_item(username, hashid): def edit_item(username: str, hashid: str) -> str | Response:
obj = get_item(username, hashid) obj = get_item(username, hashid)
form = SourceDocForm(obj=obj) form = SourceDocForm(obj=obj)
if form.validate_on_submit(): if form.validate_on_submit():
@ -568,7 +593,7 @@ def edit_item(username, hashid):
@bp.route("/source_doc_text/<int:source_doc_id>") @bp.route("/source_doc_text/<int:source_doc_id>")
def source_doc_text(source_doc_id): def source_doc_text(source_doc_id: int) -> str:
doc = SourceDoc.query.get(source_doc_id) doc = SourceDoc.query.get(source_doc_id)
return flask.render_template("source_doc_text.html", doc=doc, iter_lines=iter_lines) return flask.render_template("source_doc_text.html", doc=doc, iter_lines=iter_lines)
@ -605,7 +630,7 @@ def new_sourcedoc() -> str | Response:
@bp.route("/new/xanalink/raw", methods=["GET", "POST"]) @bp.route("/new/xanalink/raw", methods=["GET", "POST"])
@login_required @login_required
def new_xanalink_raw(): def new_xanalink_raw() -> str | Response:
form = ItemForm() form = ItemForm()
if form.validate_on_submit(): if form.validate_on_submit():
obj = XanaLink(user=current_user) obj = XanaLink(user=current_user)
@ -625,8 +650,9 @@ def new_xanalink() -> str | Response:
return flask.render_template("new_xanalink.html") return flask.render_template("new_xanalink.html")
data = flask.request.get_json() data = flask.request.get_json()
lines = ["type=" + data["link_type"]] assert isinstance(data, dict)
for facet in data["facets"]: lines = ["type=" + typing.cast(str, data["link_type"])]
for facet in typing.cast(list[list[str]], data["facets"]):
lines += ["facet="] + facet lines += ["facet="] + facet
text = "".join(line + "\n" for line in lines) text = "".join(line + "\n" for line in lines)
obj = XanaLink(user=current_user, text=text) obj = XanaLink(user=current_user, text=text)
@ -668,6 +694,7 @@ def edit_source_document(filename: str) -> str | Response:
@bp.route("/api/1/get/<username>/<filename>") @bp.route("/api/1/get/<username>/<filename>")
def api_get_document(username: str, filename: str) -> Response: def api_get_document(username: str, filename: str) -> Response:
"""API to get a document, returns JSON."""
doc = get_source_doc(username, filename) doc = get_source_doc(username, filename)
assert doc.text is not None assert doc.text is not None
ret = { ret = {