diff --git a/sourcing/__init__.py b/sourcing/__init__.py index aa2e424..a8694de 100644 --- a/sourcing/__init__.py +++ b/sourcing/__init__.py @@ -1,12 +1,14 @@ from flask import Flask -from . import database -from . import view + +from . import database, view from .utils import display_datetime -def create_app(config): + +def create_app(config: str) -> Flask: + """Create the application.""" app = Flask(__name__) app.config.from_object(config) database.init_app(app) view.init_app(app) - app.jinja_env.filters['datetime'] = display_datetime + app.jinja_env.filters["datetime"] = display_datetime return app diff --git a/sourcing/cli.py b/sourcing/cli.py index 60a53b9..b5dde45 100644 --- a/sourcing/cli.py +++ b/sourcing/cli.py @@ -1,43 +1,50 @@ -from . import create_app, model, database, edl from pprint import pprint + import click -app = create_app('config.default') +from . import create_app, database, edl, model + +app = create_app("config.default") + @app.cli.command() -@click.argument('user_or_email') -@click.option('--password', prompt=True, hide_input=True) +@click.argument("user_or_email") +@click.option("--password", prompt=True, hide_input=True) def reset_password(user_or_email, password): user = model.User.lookup_user_or_email(user_or_email) user.set_password(password) database.session.commit() - print(f'password updated for {user.username} ({user.email})') + print(f"password updated for {user.username} ({user.email})") + @app.cli.command() -@click.argument('hashid') +@click.argument("hashid") def parse_link(hashid): - home = 'http://localhost:5000/' + home = "http://localhost:5000/" item = model.Item.get_by_hashid(hashid) pprint(item.parse()) print(item.item_and_title(home)) + @app.cli.command() def all_titles(): - home = 'http://localhost:5000/' + home = "http://localhost:5000/" titles = model.XanaLink.get_all_titles(home=home) print(titles.values()) + @app.cli.command() -@click.argument('hashid') +@click.argument("hashid") def delete_item(hashid): item = model.Item.get_by_hashid(hashid) database.session.delete(item) database.session.commit() + @app.cli.command() def populate_references(): - home = 'http://localhost:5000/' + home = "http://localhost:5000/" seen = set() for ref in model.Reference.query: @@ -45,14 +52,14 @@ def populate_references(): for link_obj in model.XanaLink.query: link = link_obj.parse() - for items in link['facets']: + for items in link["facets"]: for i in items: - k, _, v = i.partition(': ') - if k == 'span' and ',' in v: - v = v.partition(',')[0] + k, _, v = i.partition(": ") + if k == "span" and "," in v: + v = v.partition(",")[0] item = model.Item.from_external(v, home=home) if item: - print(link_obj.id, '->', item.id) + print(link_obj.id, "->", item.id) as_tuple = (link_obj.id, item.id) if as_tuple in seen: continue @@ -62,13 +69,13 @@ def populate_references(): for xanapage in model.XanaPage.query: doc_edl = edl.parse_edl(xanapage.text) - if 'spans' not in doc_edl or not doc_edl['spans']: + if "spans" not in doc_edl or not doc_edl["spans"]: continue - for url, start, length in doc_edl['spans']: + for url, start, length in doc_edl["spans"]: src_doc = model.Item.from_external(url, home=home) if not src_doc.id: continue - print(xanapage.id, '->', src_doc.id) + print(xanapage.id, "->", src_doc.id) as_tuple = (xanapage.id, src_doc.id) if as_tuple in seen: continue @@ -78,10 +85,11 @@ def populate_references(): database.session.commit() + @app.cli.command() -@click.argument('hashid') +@click.argument("hashid") def show_references(hashid): item = model.Item.get_by_hashid(hashid) - print('item_id:', item.id) - print('subjects:', [i.id for i in item.subjects]) - print('objects:', [i.id for i in item.objects]) + print("item_id:", item.id) + print("subjects:", [i.id for i in item.subjects]) + print("objects:", [i.id for i in item.objects]) diff --git a/sourcing/database.py b/sourcing/database.py index fe92862..81406e3 100644 --- a/sourcing/database.py +++ b/sourcing/database.py @@ -1,18 +1,28 @@ +"""Database.""" + +import flask +import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker session = scoped_session(sessionmaker()) -def init_db(db_url): + +def init_db(db_url: str) -> None: + """Initialise databsae.""" session.configure(bind=get_engine(db_url)) -def get_engine(db_url): + +def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: + """Create an engine object.""" return create_engine(db_url, pool_recycle=3600) -def init_app(app): - db_url = app.config['DB_URL'] + +def init_app(app: flask.app.Flask) -> None: + """Initialise database connection within flask app.""" + db_url = app.config["DB_URL"] session.configure(bind=get_engine(db_url)) @app.teardown_appcontext - def shutdown_session(exception=None): + def shutdown_session(exception: Exception | None = None) -> None: session.remove() diff --git a/sourcing/edit.py b/sourcing/edit.py index 5f6cf1d..0fb7abf 100644 --- a/sourcing/edit.py +++ b/sourcing/edit.py @@ -1,25 +1,43 @@ +"""Edit.""" + +import typing + import attr -class EditOutOfRange(Exception): - pass +from .span import Span -def apply_delete(current_spans, edit): + +class EditOutOfRange(Exception): + """Edit out of range.""" + + +class Edit(typing.TypedDict): + """Edit.""" + + start: int + old: str + span: Span + op: str + + +def apply_delete(current_spans: list[Span], edit: Edit) -> list[Span]: + """Apply delete.""" assert edit if not current_spans: - raise ValueError('edit is out of bounds') + raise ValueError("edit is out of bounds") spans = [] pos = 0 - edit_end = edit['start'] + len(edit['old']) + edit_end = edit["start"] + len(edit["old"]) cur_span = current_spans.pop(0) - while pos + cur_span.length < edit['start']: + while pos + cur_span.length < edit["start"]: spans.append(cur_span) pos += cur_span.length cur_span = current_spans.pop(0) - if edit['start'] > pos: - new_span = attr.evolve(cur_span, length=edit['start'] - pos) + if edit["start"] > pos: + new_span = attr.evolve(cur_span, length=edit["start"] - pos) spans.append(new_span) while pos + cur_span.length < edit_end: @@ -30,28 +48,27 @@ def apply_delete(current_spans, edit): offset = cur_span.start - pos new_start = offset + (edit_end - pos) diff = new_start - cur_span.start - new_span = attr.evolve(cur_span, - length=cur_span.length - diff, - start=new_start) + new_span = attr.evolve(cur_span, length=cur_span.length - diff, start=new_start) spans.append(new_span) spans += current_spans return spans -def apply_insert(current_spans, edit): - if not current_spans and edit['0'] == 0: - return edit['span'] + +def apply_insert(current_spans: list[Span], edit: Edit) -> Span | list[Span]: + if not current_spans and edit["0"] == 0: + return edit["span"] pos = 0 spans = [] cur_span = current_spans.pop(0) - while pos + cur_span.length < edit['start']: + while pos + cur_span.length < edit["start"]: spans.append(cur_span) pos += cur_span.length cur_span = current_spans.pop(0) - if edit['start'] >= pos: - length_a = edit['start'] - pos + if edit["start"] >= pos: + length_a = edit["start"] - pos length_b = cur_span.length - length_a if length_a: @@ -59,35 +76,34 @@ def apply_insert(current_spans, edit): pos += length_a spans.append(span_a) - spans.append(edit['span']) - pos += edit['span'].length + spans.append(edit["span"]) + pos += edit["span"].length if length_b: - span_b = attr.evolve(cur_span, - start=cur_span.start + length_a, - length=length_b) + span_b = attr.evolve( + cur_span, start=cur_span.start + length_a, length=length_b + ) spans.append(span_b) pos += length_b else: - spans.append(edit['span']) + spans.append(edit["span"]) spans += current_spans return spans -def apply_edits(spans, edits): + +def apply_edits(spans: list[Span], edits: list[Edit]) -> list[Span]: for edit in edits: - if edit['op'] == 'delete': + if edit["op"] == "delete": spans = apply_delete(spans, edit) continue - if edit['op'] == 'insert': + if edit["op"] == "insert": spans = apply_insert(spans, edit) continue - if edit['op'] == 'replace': + if edit["op"] == "replace": spans = apply_delete(spans, edit) spans = apply_insert(spans, edit) continue return spans - - diff --git a/sourcing/edl.py b/sourcing/edl.py index 830f7e1..f31a28c 100644 --- a/sourcing/edl.py +++ b/sourcing/edl.py @@ -1,88 +1,126 @@ -from .url import get_url, get_text -from .parse import get_span, parse_span, parse_link, parse_sourcedoc_facet, xanapage_span_html, span_html, get_urls +"""Edit decision list.""" + +import re +import typing from collections import defaultdict from html import escape from pprint import pprint + +from .parse import ( + SourceText, + SpanContents, + get_span, + get_urls, + parse_link, + parse_sourcedoc_facet, + parse_span, + span_html, + xanapage_span_html, +) +from .url import get_text, get_url from .utils import protect_start_spaces -import re - -re_comment = re.compile(r'#.*') -re_xanalink = re.compile('xanalink: +([^ ]+) *$') +re_comment = re.compile(r"#.*") +re_xanalink = re.compile("xanalink: +([^ ]+) *$") max_sourcedoc_size = 600000 -def fulfil_edl(edl): - text = {} - for url, start, length in parse_edl(edl)['spans']: + +def fulfil_edl(edl: str) -> typing.Iterator[SpanContents]: + """Yeild each span of an EDL.""" + text: SourceText = {} + for url, start, length in parse_edl(edl)["spans"]: if url not in text: text[url] = get_text(url) yield get_span(text, url, start, length) -def parse_edl(edl_text): - edl = { - 'spans': [], - 'links': [], + +class EDLDict(typing.TypedDict): + """Dict representing an EDL.""" + + spans: list[tuple[str, int, int]] + links: list[dict[str, typing.Any]] + + +def parse_edl(edl_text: str) -> EDLDict: + """Parse an EDL and return contents.""" + edl: EDLDict = { + "spans": [], + "links": [], } for line in edl_text.splitlines(): - line = re_comment.sub('', line).strip() + line = re_comment.sub("", line).strip() if not line: continue span_pointer = parse_span(line) if span_pointer: - edl['spans'].append(span_pointer) + edl["spans"].append(span_pointer) continue m = re_xanalink.match(line) if m: link_url = m.group(1) - edl['links'].append({ - 'url': link_url, - 'text': get_url(link_url), - }) + edl["links"].append( + { + "url": link_url, + "text": get_url(link_url), + } + ) continue return edl -def fulfil_edl_with_sources(edl_text, links=None, hide_all_transclusions=False): - edl = parse_edl(edl_text) - return fulfil_edl_with_links(edl, - links=links, - hide_all_transclusions=hide_all_transclusions) -def fulfil_edl_with_links(edl, doc_num='', links=None, hide_all_transclusions=False): - spans = edl['spans'] +def fulfil_edl_with_sources( + edl_text: str, links=None, hide_all_transclusions: bool = False +): + edl = parse_edl(edl_text) + return fulfil_edl_with_links( + edl, links=links, hide_all_transclusions=hide_all_transclusions + ) + + +def fulfil_edl_with_links( + edl: EDLDict, + doc_num: str = "", + links: list[dict[str, str]] | None = None, + hide_all_transclusions: bool = False, +): + spans = edl["spans"] hide_transclusions = set() two_facet_links = [] if not links: - links = [parse_link(link['text']) for link in edl['links']] + links = [parse_link(link["text"]) for link in edl["links"]] link_num = 0 for link in links: - if link['type'] == 'HideTransclusions': - hide_transclusions.add(parse_sourcedoc_facet(link['facets'][0])) - elif len(link['facets']) == 2: - two_facet_links.append((link_num, [parse_span(span[0]) for span in link['facets']])) + if link["type"] == "HideTransclusions": + hide_transclusions.add(parse_sourcedoc_facet(link["facets"][0])) + elif len(link["facets"]) == 2: + two_facet_links.append( + (link_num, [parse_span(span[0]) for span in link["facets"]]) + ) link_num += 1 source = [get_text(url) for url in get_urls(spans)] - source_text = {s['url']: s['text'] for s in source} + source_text = {s["url"]: s["text"] for s in source} source_doc_links = defaultdict(list) for link_num, facets in two_facet_links: for facet_num, span in enumerate(facets): + assert span url, start, length = span - source_doc_links[url].append((start, length, link_num, 'link', facet_num)) + source_doc_links[url].append((start, length, link_num, "link", facet_num)) if url in source_text: continue s = get_text(url) source.append(s) - source_text[s['url']] = s['text'] + source_text[s["url"]] = s["text"] for s in source_doc_links.values(): s.sort() @@ -93,57 +131,67 @@ def fulfil_edl_with_links(edl, doc_num='', links=None, hide_all_transclusions=Fa for num, (url, start, length) in spans: highlight = not hide_all_transclusions and url not in hide_transclusions span_text = source_text[url] # [start:start + length] - new_text = '' + new_text = "" pos = start - for link_start, link_len, link_num, span_type, facet_num in source_doc_links[url]: + for link_start, link_len, link_num, span_type, facet_num in source_doc_links[ + url + ]: link_end = link_start + link_len if link_start >= start + length: break if link_end < start: continue - cls = 'xanapagelink link' - link_span = (f'' + - escape(span_text[link_start:link_end]) + - '') + cls = "xanapagelink link" + link_span = ( + f'' + + escape(span_text[link_start:link_end]) + + "" + ) new_text += escape(span_text[pos:link_start]) + link_span pos = link_end - new_text += escape(span_text[pos:start + length]) + new_text += escape(span_text[pos : start + length]) cur = xanapage_span_html(num, new_text, url, start, length, highlight=highlight) doc_spans.append(cur) - doc = ''.join(doc_spans) + doc = "".join(doc_spans) for s in source: - text = protect_start_spaces(s.pop('text')) - if s['length'] > max_sourcedoc_size: + text = protect_start_spaces(s.pop("text")) + if s["length"] > max_sourcedoc_size: # print('{} > {}'.format(s['length'], max_sourcedoc_size)) continue - if s['url'] in hide_transclusions: + if s["url"] in hide_transclusions: continue - source_spans = [(start, length, num, 'transclusion', 0) for num, (url, start, length) in spans if url == s['url']] - source_spans += source_doc_links[s['url']] + source_spans = [ + (start, length, num, "transclusion", 0) + for num, (url, start, length) in spans + if url == s["url"] + ] + source_spans += source_doc_links[s["url"]] source_spans.sort() - new_text = '' + new_text = "" pos = 0 pprint(source_spans) for start, length, num, span_type, _ in source_spans: end = start + length - new_text += (escape(text[pos:start]) + - span_html(span_type, num) + - escape(text[start:end]) + - '') + new_text += ( + escape(text[pos:start]) + + span_html(span_type, num) + + escape(text[start:end]) + + "" + ) pos = end new_text += escape(text[pos:]) - new_text = new_text.replace('\n', '
\n') + new_text = new_text.replace("\n", "
\n") - s['text'] = new_text + s["text"] = new_text return { - 'source': source, - 'doc': doc.replace('\n', '
\n'), - 'span_count': len(spans), - 'link_count': len(two_facet_links), + "source": source, + "doc": doc.replace("\n", "
\n"), + "span_count": len(spans), + "link_count": len(two_facet_links), } diff --git a/sourcing/mail.py b/sourcing/mail.py index 77a96c6..f99ca77 100644 --- a/sourcing/mail.py +++ b/sourcing/mail.py @@ -1,40 +1,49 @@ -from flask import render_template, current_app -from email.mime.text import MIMEText -from email.utils import formatdate, make_msgid -from email import charset -from email.utils import formataddr +"""Send email.""" + import smtplib +from email import charset +from email.mime.text import MIMEText +from email.utils import formataddr, formatdate, make_msgid -charset.add_charset('utf-8', charset.SHORTEST, charset.QP) +from flask import current_app, render_template -def format_message(user, subject, body): - from_name = current_app.config['FROM_NAME'] - from_addr = current_app.config['FROM_ADDR'] +from .model import User - msg = MIMEText(body, 'plain', 'UTF-8') - msg['Subject'] = subject - msg['To'] = formataddr((user.mail_to_name, user.email)) - msg['From'] = formataddr((from_name, from_addr)) - msg['Date'] = formatdate() - msg['Message-ID'] = make_msgid() +charset.add_charset("utf-8", charset.SHORTEST, charset.QP) + + +def format_message(user: User, subject: str, body: str) -> MIMEText: + """Format an email.""" + from_name = current_app.config["FROM_NAME"] + from_addr = current_app.config["FROM_ADDR"] + + msg = MIMEText(body, "plain", "UTF-8") + msg["Subject"] = subject + msg["To"] = formataddr((user.mail_to_name, user.email)) + msg["From"] = formataddr((from_name, from_addr)) + msg["Date"] = formatdate() + msg["Message-ID"] = make_msgid() return msg -def send_mail(user, subject, body): - bounce_addr = current_app.config['FROM_ADDR'] + +def send_mail(user: User, subject: str, body: str) -> None: + """Send an email.""" + bounce_addr = current_app.config["FROM_ADDR"] msg = format_message(user, subject, body) msg_as_string = msg.as_string() - if not current_app.config['REALLY_SEND_MAIL']: # during development + if not current_app.config["REALLY_SEND_MAIL"]: # during development return - s = smtplib.SMTP('localhost') + s = smtplib.SMTP("localhost") s.sendmail(bounce_addr, [user.email], msg_as_string) s.quit() -def send_signup_mail(user): - ''' unused so far ''' - subject = u'xanadu: verify your account' - body = render_template('mail/signup.txt', user=user) + +def send_signup_mail(user: User) -> None: + """Unused so far.""" + subject = "xanadu: verify your account" + body = render_template("mail/signup.txt", user=user) send_mail(user, subject, body) diff --git a/sourcing/model.py b/sourcing/model.py index 08775aa..beb8024 100644 --- a/sourcing/model.py +++ b/sourcing/model.py @@ -1,20 +1,34 @@ -from flask import url_for, current_app -from .database import session -from .parse import parse_link, parse_sourcedoc_facet, parse_span -from .text import first_non_empty_line -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import Column, ForeignKey -from sqlalchemy.types import String, Unicode, Integer, DateTime, Boolean, UnicodeText, Enum -from sqlalchemy import func -from sqlalchemy.orm import relationship, validates, synonym, configure_mappers -from sqlalchemy.sql import exists -from flask_login import UserMixin -from werkzeug.security import generate_password_hash, check_password_hash -from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy_continuum import make_versioned -from sqlalchemy_continuum.plugins import FlaskPlugin, ActivityPlugin +"""Models.""" + +from __future__ import annotations + import re +import typing + +from flask import current_app, url_for +from flask_login import UserMixin from hashids import Hashids +from sqlalchemy import Column, ForeignKey, func +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy.orm import configure_mappers, relationship, synonym, validates +from sqlalchemy.sql import exists +from sqlalchemy.types import ( + Boolean, + DateTime, + Enum, + Integer, + String, + Unicode, + UnicodeText, +) +from sqlalchemy_continuum import make_versioned +from sqlalchemy_continuum.plugins import ActivityPlugin, FlaskPlugin +from werkzeug.security import check_password_hash, generate_password_hash + +from .database import session +from .parse import parse_link, parse_span +from .text import first_non_empty_line activity_plugin = ActivityPlugin() make_versioned(plugins=[FlaskPlugin(), activity_plugin]) @@ -24,40 +38,69 @@ doc_hashids = Hashids(min_length=8) Base = declarative_base() Base.query = session.query_property() -re_server_url = re.compile(r'^http://perma.pub/\d+/([^/]+)/([^/]+)$') +re_server_url = re.compile(r"^http://perma.pub/\d+/([^/]+)/([^/]+)$") # list of disallowed usernames - maybe this should be in the database -reserved_name = ['root', 'admin', 'administrator', 'support', 'info', - 'test', 'tech', 'online', 'old', 'new', 'jobs', 'login', 'job', 'ipad' - 'iphone', 'javascript', 'script', 'host', 'mail', 'image', 'faq', - 'file', 'ftp', 'error', 'warning', 'the', 'assistance', 'maintenance', - 'controller', 'head', 'chief', 'anon'] +reserved_name = [ + "root", + "admin", + "administrator", + "support", + "info", + "test", + "tech", + "online", + "old", + "new", + "jobs", + "login", + "job", + "ipad" "iphone", + "javascript", + "script", + "host", + "mail", + "image", + "faq", + "file", + "ftp", + "error", + "warning", + "the", + "assistance", + "maintenance", + "controller", + "head", + "chief", + "anon", +] -re_username = re.compile(r'^\w+$', re.U) +re_username = re.compile(r"^\w+$", re.U) re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U) -re_comment = re.compile(r'#.*') +re_comment = re.compile(r"#.*") def item_url(): - return url_for('view.view_item', - username=self.user.username, - hashid=self.hashid) + return url_for("view.view_item", username=self.user.username, hashid=self.hashid) def user_exists(field, value): return session.query(exists().where(field == value)).scalar() + class TimeStampedModel(Base): __abstract__ = True created = Column(DateTime, default=func.now()) modified = Column(DateTime, default=func.now(), onupdate=func.now()) + class LoginError(Exception): def __init__(self, msg): self.msg = msg + class User(TimeStampedModel, UserMixin): - __tablename__ = 'user' + __tablename__ = "user" id = Column(Integer, primary_key=True) username = Column(Unicode(32), unique=True, nullable=False) pw_hash = Column(String(160), nullable=False) @@ -70,16 +113,16 @@ class User(TimeStampedModel, UserMixin): full_name = Column(Unicode(64)) balance = Column(Integer, nullable=False, default=0) - user_id = synonym('id') - name = synonym('full_name') - user_name = synonym('username') + user_id = synonym("id") + name = synonym("full_name") + user_name = synonym("username") def __init__(self, **kwargs): - pw_hash = generate_password_hash(kwargs.pop('password')) + pw_hash = generate_password_hash(kwargs.pop("password")) return super(User, self).__init__(pw_hash=pw_hash, **kwargs) def __repr__(self): - return ''.format(self.username) + return "".format(self.username) def set_password(self, password): self.pw_hash = generate_password_hash(password) @@ -90,17 +133,17 @@ class User(TimeStampedModel, UserMixin): def get_id(self): return self.id - @validates('email') + @validates("email") def validate_email(self, key, value): - assert '@' in value + assert "@" in value return value - @validates('username') + @validates("username") def validate_usernane(self, key, value): assert re_username.match(value) return value - @validates('full_name') + @validates("full_name") def validate_full_name(self, key, value): if value: assert re_full_name.match(value) @@ -112,108 +155,119 @@ class User(TimeStampedModel, UserMixin): @classmethod def lookup_user_or_email(cls, user_or_email): - field = cls.email if '@' in user_or_email else cls.username + field = cls.email if "@" in user_or_email else cls.username return cls.query.filter(field == user_or_email).one_or_none() @property def mail_to_name(self): - '''Name to use on e-mails sent to the user.''' + """Name to use on e-mails sent to the user.""" return self.full_name or self.username @classmethod def attempt_login(cls, user_or_email, password): user = cls.lookup_user_or_email(user_or_email) if not user: - raise LoginError('user not found') + raise LoginError("user not found") if user.disabled: - raise LoginError('user account disabled') + raise LoginError("user account disabled") if not user.check_password(password): - raise LoginError('incorrect password') + raise LoginError("incorrect password") return user + class Reference(Base): - __tablename__ = 'reference' - subject_id = Column(Integer, ForeignKey('item.id'), primary_key=True) - object_id = Column(Integer, ForeignKey('item.id'), primary_key=True) + __tablename__ = "reference" + subject_id = Column(Integer, ForeignKey("item.id"), primary_key=True) + object_id = Column(Integer, ForeignKey("item.id"), primary_key=True) + class Item(TimeStampedModel): - __tablename__ = 'item' - __versioned__ = {'base_classes': (TimeStampedModel,)} + __tablename__ = "item" + __versioned__ = {"base_classes": (TimeStampedModel,)} id = Column(Integer, primary_key=True) - user_id = Column(Integer, ForeignKey('user.id')) + user_id = Column(Integer, ForeignKey("user.id")) published = Column(DateTime) - type = Column(Enum('sourcedoc', 'xanapage', 'xanalink', name='item_type'), - nullable=False) + type = Column( + Enum("sourcedoc", "xanapage", "xanalink", name="item_type"), nullable=False + ) filename = Column(Unicode) text = Column(UnicodeText) - subjects = relationship('Item', - lazy='dynamic', - secondary='reference', - primaryjoin=id == Reference.object_id, - secondaryjoin=id == Reference.subject_id) - objects = relationship('Item', - lazy='dynamic', - secondary='reference', - primaryjoin=id == Reference.subject_id, - secondaryjoin=id == Reference.object_id) - user = relationship('User', backref='items') + subjects = relationship( + "Item", + lazy="dynamic", + secondary="reference", + primaryjoin=id == Reference.object_id, + secondaryjoin=id == Reference.subject_id, + ) + objects = relationship( + "Item", + lazy="dynamic", + secondary="reference", + primaryjoin=id == Reference.subject_id, + secondaryjoin=id == Reference.object_id, + ) + user = relationship("User", backref="items") __mapper_args__ = { - 'polymorphic_on': type, - 'with_polymorphic': '*', + "polymorphic_on": type, + "with_polymorphic": "*", } @property - def hashid(self): + def hashid(self) -> str: + """Hashid for item.""" return doc_hashids.encode(self.id) @classmethod - def get_by_hashid(cls, hashid): + def get_by_hashid(cls, hashid: str) -> Item | None: + """Return the item with the given hashid.""" try: item_id = doc_hashids.decode(hashid)[0] except IndexError: - return - return cls.query.get(item_id) + return None + return typing.cast("Item", cls.query.get(item_id)) - def view_url(self, endpoint, **kwargs): - return url_for('view.' + endpoint, - username=self.user.username, - hashid=self.hashid, - **kwargs) + def view_url(self, endpoint, **kwargs) -> str: + return url_for( + "view." + endpoint, + username=self.user.username, + hashid=self.hashid, + **kwargs, + ) @property - def url(self): - return self.view_url('view_item') + def url(self) -> str: + return self.view_url("view_item") def url_fragment(self): - return self.user.username + '/' + self.hashid + return self.user.username + "/" + self.hashid def version_url(self, version): - return self.view_url('view_item', v=version) + return self.view_url("view_item", v=version) @property def history_url(self): - return self.view_url('history') + return self.view_url("history") @property def external_url(self): - base_url = current_app.config.get('BASE_URL') - if not base_url.endswith('/'): - base_url += '/' + base_url = current_app.config.get("BASE_URL") + if not base_url.endswith("/"): + base_url += "/" if base_url: return base_url + self.url_fragment() else: - return self.view_url('view_item', _external=True) + return self.view_url("view_item", _external=True) @property def edit_url(self): - return self.view_url('edit_item') + return self.view_url("edit_item") @property def set_title_url(self): - return self.view_url('set_title') + return self.view_url("set_title") def title_from_link(self, titles=None): if not titles: @@ -221,9 +275,10 @@ class Item(TimeStampedModel): return titles.get(self) def title(self, titles=None): - return self.type + ': ' + (self.title_from_link(titles) or self.hashid) + return self.type + ": " + (self.title_from_link(titles) or self.hashid) - def has_title(self): + def has_title(self) -> bool: + """Item has a title.""" titles = XanaLink.get_all_titles() return self in titles @@ -231,42 +286,46 @@ class Item(TimeStampedModel): title_source_doc = SourceDoc(text=title, user=user) session.add(title_source_doc) session.commit() - link_text = '''type=title + link_text = """type=title facet= sourcedoc: {} facet= -span: {},start=0,length={}'''.format(self.external_url, title_source_doc.external_url, len(title)) +span: {},start=0,length={}""".format( + self.external_url, title_source_doc.external_url, len(title) + ) title_link = XanaLink(text=link_text, user=user) session.add(title_link) session.commit() @classmethod - def from_external(cls, url, home=None): - base = current_app.config.get('BASE_URL') + def from_external(cls, url: str, home: str | None = None) -> None | "Item": + """Get item from URL.""" + base = current_app.config.get("BASE_URL") username, hashid = None, None if home is None: - home = url_for('view.home', _external=True) + home = url_for("view.home", _external=True) if url.startswith(home): - username, _, hashid = url[len(home):].partition('/') + username, _, hashid = url[len(home) :].partition("/") elif base and url.startswith(base): - username, _, hashid = url[len(base):].lstrip('/').partition('/') + username, _, hashid = url[len(base) :].lstrip("/").partition("/") - if username and '/' in username or hashid and '/' in hashid: + if username and "/" in username or hashid and "/" in hashid: username, hashid = None, None if not username or not hashid: m = re_server_url.match(url) if not m: - return + return None username, hashid = m.groups() item_id = doc_hashids.decode(hashid)[0] q = cls.query.filter(User.username == username, cls.id == item_id) return q.one_or_none() + class XanaPage(Item): - __tablename__ = 'xanapage' - __mapper_args__ = {'polymorphic_identity': 'xanapage'} + __tablename__ = "xanapage" + __mapper_args__ = {"polymorphic_identity": "xanapage"} id = Column(Integer, ForeignKey(Item.id), primary_key=True) @@ -275,22 +334,23 @@ class XanaPage(Item): @property def xanaedit_url(self): - return self.view_url('xanaedit_item') + return self.view_url("xanaedit_item") @property def save_xanaedit_url(self): - return self.view_url('save_xanaedit') + return self.view_url("save_xanaedit") def iter_spans(self): for line in self.text.splitlines(): - line = re_comment.sub('', line).strip() + line = re_comment.sub("", line).strip() if not line: continue span_pointer = parse_span(line) if span_pointer: yield span_pointer - def update_references(self): + def update_references(self) -> None: + """Update references.""" for url, start, length in self.iter_spans(): src_doc = Item.from_external(url) if not src_doc or not src_doc.id: @@ -304,8 +364,8 @@ class XanaPage(Item): class XanaLink(Item): - __tablename__ = 'xanalink' - __mapper_args__ = {'polymorphic_identity': 'xanalink'} + __tablename__ = "xanalink" + __mapper_args__ = {"polymorphic_identity": "xanalink"} id = Column(Integer, ForeignKey(Item.id), primary_key=True) @@ -314,38 +374,38 @@ class XanaLink(Item): @property def link_type(self): - return self.parse()['type'] + return self.parse()["type"] def title(self, titles=None): if titles is None: titles = XanaLink.get_all_titles() if self in titles: - return self.type + ': ' + titles[self] + return self.type + ": " + titles[self] parsed = self.parse() - if parsed['type'] == 'title': - ident = parsed['facets'][0][0].partition(': ')[2] + if parsed["type"] == "title": + ident = parsed["facets"][0][0].partition(": ")[2] item = Item.from_external(ident) if item in titles: - return parsed['type'] + " link for " + item.title(titles=titles) + return parsed["type"] + " link for " + item.title(titles=titles) - if parsed['type']: - return parsed['type'] + " link: " + self.hashid + if parsed["type"]: + return parsed["type"] + " link: " + self.hashid else: return "link: " + self.hashid def item_and_title(self, home=None): link = self.parse() - if link['type'] != 'title': + if link["type"] != "title": return try: - facet1, facet2 = link['facets'] + facet1, facet2 = link["facets"] except ValueError: return - link_type, _, ident = facet1[0].partition(': ') + link_type, _, ident = facet1[0].partition(": ") item = Item.from_external(ident, home) try: @@ -355,10 +415,10 @@ class XanaLink(Item): source_of_title = SourceDoc.from_external(ident2, home) if source_of_title: - return(item, source_of_title.text[start:length + start]) + return (item, source_of_title.text[start : length + start]) @classmethod - def get_all_titles(cls, home=None): + def get_all_titles(cls, home: str | None = None) -> dict["Item", str]: titles = {} for link in cls.query: ret = link.item_and_title(home) @@ -371,9 +431,10 @@ class XanaLink(Item): def snippet(self): return self.text + class SourceDoc(Item): - __tablename__ = 'sourcedoc' - __mapper_args__ = {'polymorphic_identity': 'sourcedoc'} + __tablename__ = "sourcedoc" + __mapper_args__ = {"polymorphic_identity": "sourcedoc"} id = Column(Integer, ForeignKey(Item.id), primary_key=True) db_price_per_character = Column(Integer) @@ -387,22 +448,22 @@ class SourceDoc(Item): def price_per_character(self): return self.db_price_per_character or self.db_document_price / len(self.text) - def snippet(self, length=255, killwords=False, end='...', leeway=5): + def snippet(self, length=255, killwords=False, end="...", leeway=5): s = self.text - assert length >= len(end), 'expected length >= %s, got %s' % (len(end), length) - assert leeway >= 0, 'expected leeway >= 0, got %s' % leeway + assert length >= len(end), "expected length >= %s, got %s" % (len(end), length) + assert leeway >= 0, "expected leeway >= 0, got %s" % leeway if len(s) <= length + leeway: return s if killwords: - return s[:length - len(end)] + end - result = s[:length - len(end)].rsplit(' ', 1)[0] + return s[: length - len(end)] + end + result = s[: length - len(end)].rsplit(" ", 1)[0] return result + end def raw_title(self): return self.title(with_type=False) def title(self, titles=None, with_type=True): - start = self.type + ': ' if with_type else '' + start = self.type + ": " if with_type else "" titles = XanaLink.get_all_titles() from_link = self.title_from_link(titles=titles) if from_link: @@ -414,11 +475,11 @@ class SourceDoc(Item): @property def create_xanapage_url(self): - return self.view_url('create_xanapage_from_sourcedoc') + return self.view_url("create_xanapage_from_sourcedoc") @property def entire_span(self): - return self.external_url + f',start=0,length={len(self.text)}' + return self.external_url + f",start=0,length={len(self.text)}" configure_mappers() diff --git a/sourcing/parse.py b/sourcing/parse.py index 7421674..eef5a9f 100644 --- a/sourcing/parse.py +++ b/sourcing/parse.py @@ -1,34 +1,58 @@ -import re import os.path +import re +import typing from html import escape -re_span_pointer = re.compile(r'span: (.*),start=(\d+),length=(\d+)') -re_xanalink = re.compile('xanalink: +([^ ]+) *$') -re_facet = re.compile('^facet\d* *=\s*(.*)\s*$') -re_comment = re.compile(r'#.*') +from .url import ExternalText + +re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)") +re_xanalink = re.compile(r"xanalink: +([^ ]+) *$") +re_facet = re.compile(r"^facet\d* *=\s*(.*)\s*$") +re_comment = re.compile(r"#.*") project_dir = os.path.dirname(os.path.dirname(__file__)) xnb_per_char = 150000 -def parse_span(line): + +class SpanContents(typing.TypedDict): + """Contents of a span.""" + + url: str + start: int + length: int + text: str + + +SpanTuple = tuple[str, int, int] + +SourceText = dict[str, ExternalText] + + +def parse_span(line: str) -> None | SpanTuple: + """Parse a span.""" m = re_span_pointer.match(line) if not m: return None return (m.group(1), int(m.group(2)), int(m.group(3))) -def get_span(text, url, start, length): + +def get_span(text: SourceText, url: str, start: int, length: int) -> SpanContents: + """Get span from source text.""" return { - 'url': url, - 'start': start, - 'length': length, - 'text': text[url]['text'][start:start + length] + "url": url, + "start": start, + "length": length, + "text": text[url]["text"][start : start + length], } -def get_urls(spans): + +def get_urls(spans: list[SpanTuple]) -> set[str]: + """Get URLs from span tuples.""" return {i[0] for i in spans} -def find_min_max(spans, source): + +def find_min_max(spans: list[SpanTuple], source): text_min, text_max = {}, {} for url, start, length in spans: if url in text_min: @@ -42,67 +66,85 @@ def find_min_max(spans, source): text_max[url] = start + length for s in source: - url = s['url'] - s['min'] = text_min[url] - s['max'] = text_max[url] + url = s["url"] + s["min"] = text_min[url] + s["max"] = text_max[url] -def span_html(span_type, num): - return ''.format(num=num, span_type=span_type) -def xanapage_span_html(num, text, url, start, length, highlight=True, censor=False): +def span_html(span_type: str, num: int) -> str: + """Open span tag.""" + return f'' + + +def xanapage_span_html( + num: int, + text: str, + url: str, + start: int, + length: int, + highlight: bool = True, + censor: bool = False, +) -> str: + """Generate HTML to represent a span.""" cls = [] if highlight: - cls = ['xanapagetransclusion', 'transclusion'] - html_class = ' class="{}"'.format(' '.join(cls)) if cls else '' + cls = ["xanapagetransclusion", "transclusion"] + html_class = f""" class="{' '.join(cls)}""" if cls else "" - html = '{}'.format(num, html_class, escape(url), start, length, text) + html = ( + f'{text}' + ) if censor: - return '' + html + '' + return '' + html + "" else: return html -def parse_sourcedoc_facet(facet): - leg = facet[0] - prefix = 'sourcedoc: ' - assert leg.startswith(prefix) - return leg[len(prefix):] -def parse_xanapage_facet(facet): +def parse_sourcedoc_facet(facet: list[str]) -> str: + """Parse sourcedoc facet.""" leg = facet[0] - prefix = 'xanapage: ' + prefix = "sourcedoc: " assert leg.startswith(prefix) - return leg[len(prefix):] + return leg[len(prefix) :] -def parse_link(link_text): + +def parse_xanapage_facet(facet: list[str]) -> str: + """Parse xanapage facet.""" + leg = facet[0] + prefix = "xanapage: " + assert leg.startswith(prefix) + return leg[len(prefix) :] + + +def parse_link(link_text: str) -> dict[str, str | None | list[list[str]]]: link_type = None - expect = 'link_type' + expect = "link_type" facets = [] for line in link_text.splitlines(): - line = re_comment.sub('', line).strip() + line = re_comment.sub("", line).strip() if not line: continue - if expect == 'link_type': - if line.startswith('type='): + if expect == "link_type": + if line.startswith("type="): link_type = line[5:] - expect = 'facets' + expect = "facets" continue - if expect != 'facets': + if expect != "facets": # print("unrecognized:", line) continue m = re_facet.match(line) if m: - legs = [] + legs: list[str] = [] facets.append(legs) if m.group(1): line = m.group(1) else: continue - if legs and legs[-1] == 'span:' and line.startswith('http'): - legs[-1] += ' ' + line + if legs and legs[-1] == "span:" and line.startswith("http"): + legs[-1] += " " + line else: legs.append(line.strip()) - return {'type': link_type, 'facets': facets} - - + return {"type": link_type, "facets": facets} diff --git a/sourcing/span.py b/sourcing/span.py index 5d0a6e4..9c3b164 100644 --- a/sourcing/span.py +++ b/sourcing/span.py @@ -1,21 +1,35 @@ +"""Span.""" + +import typing + import attr +import attr._make -def greater_than_zero(instance, attribute, value): + +def greater_than_zero(instance: "Span", attribute: typing.Any, value: int) -> None: + """Value is greater than zero.""" if value <= 0: - raise ValueError('must be greater than 0') + raise ValueError("must be greater than 0") -def is_positive(instance, attribute, value): + +def is_positive(instance: "Span", attribute: typing.Any, value: int) -> None: + """Value is positive.""" if value < 0: - raise ValueError('must be positive') + raise ValueError("must be positive") + @attr.s class Span: - url: int = attr.ib() + """Span.""" + + url: str = attr.ib() start: int = attr.ib(validator=is_positive) length: int = attr.ib(validator=greater_than_zero) def end(self) -> int: + """End position of span.""" return self.start + self.length def for_edl(self) -> str: - return f'{self.url},start={self.start},length={self.length}' + """Generate URL parameters for EDL.""" + return f"{self.url},start={self.start},length={self.length}" diff --git a/sourcing/static/bootstrap b/sourcing/static/bootstrap index fe0f86b..12ef71f 120000 --- a/sourcing/static/bootstrap +++ b/sourcing/static/bootstrap @@ -1 +1 @@ -/usr/share/javascript/bootstrap \ No newline at end of file +/usr/share/javascript/bootstrap4 \ No newline at end of file diff --git a/sourcing/url.py b/sourcing/url.py index a4f733b..4a1f693 100644 --- a/sourcing/url.py +++ b/sourcing/url.py @@ -1,42 +1,61 @@ -import requests -from .model import Item import os.path import re +import typing + +import requests + +from .model import Item project_dir = os.path.dirname(os.path.dirname(__file__)) -cache_location = os.path.join(project_dir, 'cache') +cache_location = os.path.join(project_dir, "cache") -re_colon_slash = re.compile('[/:]+') +re_colon_slash = re.compile("[/:]+") -def url_filename(url): - return re_colon_slash.sub('_', url) -def get_text(url): +def url_filename(url: str) -> str: + """Generate filename from URL.""" + return re_colon_slash.sub("_", url) + + +class ExternalText(typing.TypedDict): + """Text from external URL.""" + + url: str + text: str + heading: str + length: int + + +def get_text(url: str) -> ExternalText: + """Get text from URL and return as dict.""" # assume UTF-8 text = get_url(url) - heading = url.rsplit('/', 1)[-1] + heading = url.rsplit("/", 1)[-1] return { - 'url': url, - 'text': text, - 'heading': heading, - 'length': len(text), + "url": url, + "text": text, + "heading": heading, + "length": len(text), } -def get_url(url): + +def get_url(url: str) -> str: + """Read a URL and return the content.""" item = Item.from_external(url) if item: - return item.text + return typing.cast(str, item.text) + content = requests.get(url).content - return content.decode(errors='replace') + return content.decode(errors="replace") filename = os.path.join(cache_location, url_filename(url)) if os.path.exists(filename): - content = open(filename, 'rb').read() + content = open(filename, "rb").read() else: content = requests.get(url).content - open(filename, 'wb').write(content) + open(filename, "wb").write(content) - return content.decode(errors='replace') + return content.decode(errors="replace") diff --git a/sourcing/utils.py b/sourcing/utils.py index 519468f..e6d903a 100644 --- a/sourcing/utils.py +++ b/sourcing/utils.py @@ -1,25 +1,29 @@ -import humanize -from datetime import date, timedelta +from datetime import date, datetime, timedelta -def display_datetime(dt): +import humanize + + +def display_datetime(dt: datetime) -> str: + """Render datetime as a string for display.""" if dt is None: - return 'n/a' + return "n/a" today = date.today() if today - dt.date() < timedelta(days=1): return humanize.naturaltime(dt) else: - return dt.strftime('%a, %d %b %Y') + return dt.strftime("%a, %d %b %Y") -def nbsp_at_start(line): - ''' Protect spaces at the start of a string. ''' + +def nbsp_at_start(line: str) -> str: + """Protect spaces at the start of a string.""" space_count = 0 for c in line: - if c != ' ': + if c != " ": break space_count += 1 # return Markup(' ') * space_count + line[space_count:] - return '\u00A0' * space_count + line[space_count:] + return "\u00A0" * space_count + line[space_count:] -def protect_start_spaces(text): - return '\n'.join(nbsp_at_start(line) for line in text.splitlines()) +def protect_start_spaces(text: str) -> str: + return "\n".join(nbsp_at_start(line) for line in text.splitlines()) diff --git a/sourcing/view.py b/sourcing/view.py index d3d7d25..2b63681 100644 --- a/sourcing/view.py +++ b/sourcing/view.py @@ -1,44 +1,63 @@ -from flask import (Blueprint, render_template, request, redirect, flash, - url_for, abort, jsonify, Response, current_app) -from flask_login import (login_user, current_user, logout_user, - login_required, LoginManager) -from .forms import (LoginForm, SignupForm, AccountSettingsForm, - UploadSourceDocForm, SourceDocForm, ItemForm, - ForgotPasswordForm, PasswordForm) -from .model import User, SourceDoc, Item, XanaPage, XanaLink, Reference -from .parse import parse_xanapage_facet, parse_link -from .url import get_url -from .mail import send_mail -from .edl import fulfil_edl_with_sources, fulfil_edl, parse_edl, fulfil_edl_with_links -from .span import Span -from .edit import apply_edits -from .database import session -from .text import iter_lines, add_highlight -from werkzeug.debug.tbtools import get_current_traceback -from jinja2 import evalcontextfilter, Markup -from functools import wraps -from .utils import nbsp_at_start -from itsdangerous import URLSafeTimedSerializer -# from sqlalchemy_continuum import version_class - import json import re +import typing +from functools import wraps + +import flask +from flask_login import ( + LoginManager, + current_user, + login_required, + login_user, + logout_user, +) +from itsdangerous import URLSafeTimedSerializer +from werkzeug.wrappers import Response + +from .database import session +from .edit import apply_edits +from .edl import fulfil_edl, fulfil_edl_with_links, fulfil_edl_with_sources, parse_edl +from .forms import ( + AccountSettingsForm, + ForgotPasswordForm, + ItemForm, + LoginForm, + PasswordForm, + SignupForm, + SourceDocForm, +) +from .mail import send_mail +from .model import Item, Reference, SourceDoc, User, XanaLink, XanaPage +from .parse import parse_link, parse_xanapage_facet +from .span import Span +from .text import add_highlight, iter_lines +from .url import get_url +from .utils import nbsp_at_start + +# from werkzeug.debug.tbtools import get_current_traceback +# from jinja2 import evalcontextfilter, Markup +# from sqlalchemy_continuum import version_class + login_manager = LoginManager() -login_manager.login_view = '.login' -re_paragraph = re.compile(r'(?:\r\n|\r|\n){2,}') -re_spanpointer = re.compile(r'([A-Za-z0-9]+),start=(\d+),length=(\d+)') -bp = Blueprint('view', __name__) +login_manager.login_view = ".login" +re_paragraph = re.compile(r"(?:\r\n|\r|\n){2,}") +re_spanpointer = re.compile(r"([A-Za-z0-9]+),start=(\d+),length=(\d+)") +bp = flask.Blueprint("view", __name__) -def init_app(app): + +def init_app(app: flask.Flask) -> None: + """Initialise app.""" login_manager.init_app(app) app.register_blueprint(bp) - @app.template_filter() - @evalcontextfilter - def newline_html(eval_ctx, value): - return u'\n\n'.join(Markup(u'

') + p.replace('\n', Markup('
')) + Markup(u'

') - for p in re_paragraph.split(value)) + +# @app.template_filter() +# @evalcontextfilter +# def newline_html(eval_ctx, value): +# return u'\n\n'.join(Markup(u'

') + p.replace('\n', Markup('
')) + Markup(u'

') +# for p in re_paragraph.split(value)) + @login_manager.user_loader def load_user(user_id): @@ -46,270 +65,314 @@ def load_user(user_id): # where do we redirect after signup is complete -view_after_signup = '.home' +view_after_signup = ".home" + @bp.context_processor def inject_user(): return dict(current_user=current_user) + def show_errors(f): @wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except Exception: - traceback = get_current_traceback(skip=1, show_hidden_frames=False, - ignore_system_exceptions=True) - return traceback.render_full().encode('utf-8', 'replace') + traceback = get_current_traceback( + skip=1, show_hidden_frames=False, ignore_system_exceptions=True + ) + return traceback.render_full().encode("utf-8", "replace") + return wrapper -@bp.route('/') -def home(): + +@bp.route("/") +def home() -> str: + """Home page.""" docs = Item.query.order_by(Item.created) docs_info = [] for item in docs: cur = { - 'user': item.user.username, - 'id': item.id, - 'type': item.type, - 'year': item.created.year, + "user": item.user.username, + "id": item.id, + "type": item.type, + "year": item.created.year, } - if item.type == 'xanalink': - cur['link_type'] = item.link_type + if item.type == "xanalink": + cur["link_type"] = item.link_type docs_info.append(cur) users = [item_user.username for item_user in User.query] - years = sorted({item['year'] for item in docs_info}) + years = sorted({item["year"] for item in docs_info}) - link_types = {item['link_type'] for item in docs_info if item.get('link_type')} + link_types = {item["link_type"] for item in docs_info if item.get("link_type")} - return render_template('home.html', - docs=docs, - link_types=link_types, - years=years, - docs_info=docs_info, - nbsp_at_start=nbsp_at_start, - users=users) + return flask.render_template( + "home.html", + docs=docs, + link_types=link_types, + years=years, + docs_info=docs_info, + nbsp_at_start=nbsp_at_start, + users=users, + ) -@bp.route('/password_reset', methods=['GET', 'POST']) + +@bp.route("/password_reset", methods=["GET", "POST"]) def password_reset(): - site_name = 'perma.pub' # FIXME: move to config + site_name = "perma.pub" # FIXME: move to config form = ForgotPasswordForm() if not form.validate_on_submit(): - return render_template('auth/password_reset.html', form=form) - ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"]) + return flask.render_template("auth/password_reset.html", form=form) + ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) user = User.lookup_user_or_email(form.user_or_email.data) if user: - token = ts.dumps(user.id, salt='password-reset') - reset_link = url_for('.reset_with_token', token=token, _external=True) - reset_mail = render_template('mail/password_reset.txt', - reset_link=reset_link, - site_name=site_name, - user=user) - subject = 'Password reset on ' + site_name + token = ts.dumps(user.id, salt="password-reset") + reset_link = flask.url_for(".reset_with_token", token=token, _external=True) + reset_mail = flask.render_template( + "mail/password_reset.txt", + reset_link=reset_link, + site_name=site_name, + user=user, + ) + subject = "Password reset on " + site_name send_mail(user, subject, reset_mail) - return redirect(url_for('.password_reset_sent')) + return flask.redirect(flask.url_for(".password_reset_sent")) -@bp.route('/password_reset/sent', methods=['GET', 'POST']) + +@bp.route("/password_reset/sent", methods=["GET", "POST"]) def password_reset_sent(): - return render_template('auth/password_reset_sent.html') + return flask.render_template("auth/password_reset_sent.html") -@bp.route('/reset/', methods=['GET', 'POST']) + +@bp.route("/reset/", methods=["GET", "POST"]) def reset_with_token(token): - ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"]) + ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) try: - user_id = ts.loads(token, salt='password-reset', max_age=86400) + user_id = ts.loads(token, salt="password-reset", max_age=86400) except Exception: - abort(404) + flask.abort(404) form = PasswordForm() if not form.validate_on_submit(): - return render_template('auth/password_reset_confirm.html', form=form) + return flask.render_template("auth/password_reset_confirm.html", form=form) user = User.query.get(user_id) user.set_password(form.password.data) session.add(user) session.commit() - return redirect(url_for('.password_reset_complete')) + return flask.redirect(flask.url_for(".password_reset_complete")) -@bp.route('/reset/done') -def password_reset_complete(): - return render_template('auth/password_reset_complete.html') -@bp.route('/source_doc_upload', methods=["POST"]) +@bp.route("/reset/done") +def password_reset_complete() -> str: + return flask.render_template("auth/password_reset_complete.html") + + +@bp.route("/source_doc_upload", methods=["POST"]) @show_errors def source_doc_upload(): - f = request.files['sourcedoc_file'] + f = flask.request.files["sourcedoc_file"] text = f.read() doc = SourceDoc(text=text, user=current_user, filename=f.filename) session.add(doc) session.commit() - flash('new source document uploaded') - return redirect(doc.url) + flask.flash("new source document uploaded") + return flask.redirect(doc.url) -@bp.route('/about') -def about(): - return render_template('about.html') -@bp.route('/contact') -def contact(): - return render_template('contact.html') +@bp.route("/about") +def about() -> str: + """About page.""" + return flask.render_template("about.html") -def redirect_to_home(): - return redirect(url_for('.home')) -@bp.route('/login', methods=['GET', 'POST']) -def login(): - form = LoginForm(next=request.args.get('next')) +@bp.route("/contact") +def contact() -> str: + """Contact page.""" + return flask.render_template("contact.html") + + +def redirect_to_home() -> Response: + """Redirect to home page.""" + return flask.redirect(flask.url_for(".home")) + + +@bp.route("/login", methods=["GET", "POST"]) +def login() -> Response | str: + """Login page.""" + form = LoginForm(next=flask.request.args.get("next")) if form.validate_on_submit(): login_user(form.user, remember=form.remember.data) - flash('Logged in successfully.') - return redirect(request.form.get('next') or url_for('.home')) - return render_template('login.html', form=form) + flask.flash("Logged in successfully.") + return flask.redirect(flask.request.form.get("next") or flask.url_for(".home")) + return flask.render_template("login.html", form=form) -@bp.route('/logout') -def logout(): + +@bp.route("/logout") +def logout() -> Response: + """Logout and redirect to home.""" logout_user() - flash('You have been logged out.') + flask.flash("You have been logged out.") return redirect_to_home() -@bp.route('/signup', methods=['GET', 'POST']) + +@bp.route("/signup", methods=["GET", "POST"]) def signup(): - if not current_app.config.get('ALLOW_SIGNUP'): - abort(404) + if not flask.current_app.config.get("ALLOW_SIGNUP"): + flask.abort(404) form = SignupForm() if not form.validate_on_submit(): - return render_template('signup.html', form=form) + return flask.render_template("signup.html", form=form) - user = User(**form.data) + data = form.data.copy() + del data["csrf_token"] + user = User(**data) session.add(user) session.commit() - flash('New account created.') + flask.flash("New account created.") login_user(user) - return redirect(url_for(view_after_signup)) + return flask.redirect(flask.url_for(view_after_signup)) -def redirect_to_doc(doc): - return redirect(url_for('.view_document', hashid=doc.hashid)) -def get_source_doc(username, hashid): +def redirect_to_doc(doc: Item) -> Response: + """Redirect to the given item.""" + return flask.redirect(flask.url_for(".view_document", hashid=doc.hashid)) + + +def get_source_doc(username: str, hashid: str) -> SourceDoc: + """Get a source doc that belongs to the given uesr.""" doc = Item.get_by_hashid(hashid) - if doc and doc.user.username != username: - doc = None - return doc if doc else abort(404) + if not doc or doc.user.username != username: + flask.abort(404) + return typing.cast(SourceDoc, doc) -def get_xanapage(username, hashid): + +def get_xanapage(username: str, hashid: str) -> XanaPage: + """Get a xanapage that belongs to the given uesr.""" doc = Item.get_by_hashid(hashid) - if doc and doc.user.username != username: - doc = None - return doc if doc and doc.type == 'xanapage' else abort(404) + if not doc or doc.type != "xanapage" or doc.user.username != username: + flask.abort(404) + return typing.cast(XanaPage, doc) -def get_item(username, hashid): + +def get_item(username: str, hashid: str) -> Item: + """Get an item with the given hashid, check ownership.""" doc = Item.get_by_hashid(hashid) - if doc and doc.user.username != username: - doc = None - return doc if doc else abort(404) + if not doc or doc.user.username != username: + flask.abort(404) + return doc -@bp.route('///edl') -def view_edl(username, hashid): + +@bp.route("///edl") +def view_edl(username: str, hashid: str) -> str: + """Show EDL for xanapage.""" item = get_xanapage(username, hashid) - return render_template('view.html', - doc=item, - iter_lines=iter_lines, - nbsp_at_start=nbsp_at_start) + return flask.render_template( + "view.html", doc=item, iter_lines=iter_lines, nbsp_at_start=nbsp_at_start + ) -@bp.route('///realize') + +@bp.route("///realize") def realize_edl(username, hashid): item = get_xanapage(username, hashid) spans = list(fulfil_edl(item.text)) - doc_text = ''.join(span['text'] for span in spans) + doc_text = "".join(span["text"] for span in spans) - return render_template('realize.html', - doc=doc_text, - iter_lines=iter_lines, - item=item, - nbsp_at_start=nbsp_at_start) + return flask.render_template( + "realize.html", + doc=doc_text, + iter_lines=iter_lines, + item=item, + nbsp_at_start=nbsp_at_start, + ) -@bp.route('///raw') -def view_item_raw(username, hashid): + +@bp.route("///raw") +def view_item_raw(username: str, hashid: str): return view_item(username, hashid, raw=True) -def fulfil_xanaflight(item): + +def fulfil_xanaflight(item: XanaLink) -> str: link = item.parse() - assert link['type'] == 'flight' - facets = link['facets'] + assert link["type"] == "flight" + facets = link["facets"] docs = [] edl_list = [] all_links = [] for facet in facets: xanapage = Item.from_external(parse_xanapage_facet(facet)) - assert xanapage.type == 'xanapage' + assert xanapage and xanapage.type == "xanapage" and xanapage.text edl = parse_edl(xanapage.text) edl_list.append((xanapage, edl)) - all_links += [parse_link(link['text']) for link in edl['links']] + all_links += [parse_link(link["text"]) for link in edl["links"]] for doc_num, (xanapage, edl) in enumerate(edl_list): - doc = fulfil_edl_with_links(edl, - doc_num=doc_num, - links=all_links, - hide_all_transclusions=True) - doc['hashid'] = xanapage.hashid - del doc['link_count'] + doc = fulfil_edl_with_links( + edl, doc_num=doc_num, links=all_links, hide_all_transclusions=True + ) + doc["hashid"] = xanapage.hashid + del doc["link_count"] docs.append(doc) - return render_template('view/xanaflight.html', - item=item, - link_count=len(all_links), - docs=docs) + return flask.render_template( + "view/xanaflight.html", item=item, link_count=len(all_links), docs=docs + ) -@bp.route('///fulfil') + +@bp.route("///fulfil") def fulfil(username, hashid): item = get_item(username, hashid) - if item.type == 'xanapage': - return render_template('view/xanapage.html', - item=item, - doc=fulfil_edl_with_sources(item.text)) - if item.type == 'xanalink' and item.text.startswith('type=flight'): + if item.type == "xanapage": + return flask.render_template( + "view/xanapage.html", item=item, doc=fulfil_edl_with_sources(item.text) + ) + if item.type == "xanalink" and item.text.startswith("type=flight"): return fulfil_xanaflight(item) -@bp.route('///set_title', methods=['POST']) + +@bp.route("///set_title", methods=["POST"]) def set_title(username, hashid): item = get_item(username, hashid) - has_title = item.has_title - item.set_title(request.form['title'], current_user) - flash('title change saved' if has_title else 'title added') - return redirect(item.url) + has_title = item.has_title() + item.set_title(flask.request.form["title"], current_user) + flask.flash("title change saved" if has_title else "title added") + return flask.redirect(item.url) -@bp.route('///delete', methods=['POST']) + +@bp.route("///delete", methods=["POST"]) def delete_item(username, hashid): item = get_item(username, hashid) session.delete(item) session.commit() - flash('item deleted') + flask.flash("item deleted") return redirect_to_home() + def save_new_xanalink(doc1, doc2): - start1 = request.form['left_start'] - length1 = request.form['left_length'] - start2 = request.form['right_start'] - length2 = request.form['right_length'] + start1 = flask.request.form["left_start"] + length1 = flask.request.form["left_length"] + start2 = flask.request.form["right_start"] + length2 = flask.request.form["right_length"] assert length1 assert length2 - span1 = f'{doc1.external_url},start={start1},length={length1}' - span2 = f'{doc2.external_url},start={start2},length={length2}' + span1 = f"{doc1.external_url},start={start1},length={length1}" + span2 = f"{doc2.external_url},start={start2},length={length2}" lines = [ - 'type=', - 'facet=', - 'span: ' + span1, - 'facet=', - 'span: ' + span2, + "type=", + "facet=", + "span: " + span1, + "facet=", + "span: " + span2, ] - text = ''.join(line + '\n' for line in lines) + text = "".join(line + "\n" for line in lines) obj = XanaLink(user=current_user, text=text) session.add(obj) @@ -320,127 +383,148 @@ def save_new_xanalink(doc1, doc2): session.add(ref2) session.commit() -@bp.route('/build_links', methods=['GET', 'POST']) + +@bp.route("/build_links", methods=["GET", "POST"]) def build_links(): doc1, doc2 = None, None hashid1, hashid2 = None, None - if 'doc1' in request.args: - hashid1 = request.args['doc1'] + if "doc1" in flask.request.args: + hashid1 = flask.request.args["doc1"] doc1 = Item.get_by_hashid(hashid1) - if 'doc2' in request.args: - hashid2 = request.args['doc2'] + if "doc2" in flask.request.args: + hashid2 = flask.request.args["doc2"] doc2 = Item.get_by_hashid(hashid2) - if request.method == 'POST': + if flask.request.method == "POST": save_new_xanalink(doc1, doc2) - return redirect(url_for(request.endpoint, doc1=hashid1, doc2=hashid2)) + return flask.redirect( + flask.url_for(flask.request.endpoint, doc1=hashid1, doc2=hashid2) + ) if doc1 and doc2: - links = list({i for i in doc1.subjects} & - {i for i in doc2.subjects}) + links = list({i for i in doc1.subjects} & {i for i in doc2.subjects}) else: links = [] - return render_template('build_links.html', - iter_lines=iter_lines, - nbsp_at_start=nbsp_at_start, - SourceDoc=SourceDoc, - hashid1=hashid1, - hashid2=hashid2, - doc1=doc1, - doc2=doc2, - links=links) + return flask.render_template( + "build_links.html", + iter_lines=iter_lines, + nbsp_at_start=nbsp_at_start, + SourceDoc=SourceDoc, + hashid1=hashid1, + hashid2=hashid2, + doc1=doc1, + doc2=doc2, + links=links, + ) -@bp.route('//') -def view_item(username, hashid, raw=False): - if ',' in hashid: + +@bp.route("//") +def view_item(username: str, hashid: str, raw: bool = False) -> str | Response: + """View item.""" + if "," in hashid: m = re_spanpointer.match(hashid) + assert m hashid, start, length = m.group(1), int(m.group(2)), int(m.group(3)) item = get_item(username, hashid) if raw: - return Response(item.text[start:length + start], mimetype='text/plain') + assert item.text is not None + return flask.Response( + item.text[start : length + start], mimetype="text/plain" + ) else: start, length = None, None item = get_item(username, hashid) if raw: - return Response(item.text, mimetype='text/plain') + return flask.Response(item.text, mimetype="text/plain") - v = request.args.get('v') + v = flask.request.args.get("v") if v: if not v.isdigit(): - abort(404) + flask.abort(404) try: version = item.versions[int(v) - 1] except IndexError: - abort(404) + flask.abort(404) text = version.text else: version = None text = item.text - if item.type == 'xanapage': + if item.type == "xanapage": + assert item.text spans = list(fulfil_edl(item.text)) - doc_text = ''.join(span['text'] for span in spans) + doc_text = "".join(span["text"] for span in spans) else: doc_text = None - return render_template('view.html', - doc=item, - doc_text=doc_text, - version=version, - text=text, - span_start=start, - span_length=length, - add_highlight=add_highlight, - nbsp_at_start=nbsp_at_start, - iter_lines=iter_lines) + return flask.render_template( + "view.html", + doc=item, + doc_text=doc_text, + version=version, + text=text, + span_start=start, + span_length=length, + add_highlight=add_highlight, + nbsp_at_start=nbsp_at_start, + iter_lines=iter_lines, + ) -@bp.route('///history') -def history(username, hashid): + +@bp.route("///history") +def history(username: str, hashid: str) -> str: item = get_item(username, hashid) - return render_template('history.html', doc=item) + return flask.render_template("history.html", doc=item) -@bp.route('///as_xanapage', methods=['POST']) + +@bp.route("///as_xanapage", methods=["POST"]) def create_xanapage_from_sourcedoc(username, hashid): src_doc = get_source_doc(username, hashid) - edl = 'span: ' + src_doc.entire_span + '\n' + edl = "span: " + src_doc.entire_span + "\n" page = XanaPage(user=current_user, text=edl) session.add(page) session.commit() page.update_references() - flash('New xanapage created.') - return redirect(page.url) + flask.flash("New xanapage created.") + return flask.redirect(page.url) -@bp.route('///xanaedit', methods=['GET', 'POST']) + +@bp.route("///xanaedit", methods=["GET", "POST"]) def xanaedit_item(username, hashid): - if request.method == 'POST': + if flask.request.method == "POST": save_xanaedit(username, hashid) - return redirect(url_for('xanaedit_item', username=username, hashid=hashid)) + return flask.redirect( + flask.url_for("xanaedit_item", username=username, hashid=hashid) + ) doc = get_xanapage(username, hashid) spans = list(fulfil_edl(doc.text)) - doc_text = ''.join(span['text'] for span in spans) + doc_text = "".join(span["text"] for span in spans) - return render_template('xanaedit.html', doc=doc, doc_text=doc_text) + return flask.render_template("xanaedit.html", doc=doc, doc_text=doc_text) -def save_xanaedit(username, hashid): + +def save_xanaedit(username: str, hashid: str) -> XanaPage: + """Save a XanaEdit.""" page = get_xanapage(username, hashid) + assert page.text current_edl = parse_edl(page.text) - spans = [Span(*span) for span in current_edl['spans']] - edits = json.loads(request.form['edits']) - new_text = '' + spans = [Span(*span) for span in current_edl["spans"]] + edits = json.loads(flask.request.form["edits"]) + new_text = "" new_text_pos = 0 for edit in edits: - if edit['op'] not in ('insert', 'replace'): + if edit["op"] not in ("insert", "replace"): continue - new_text += edit['new'] - edit['span'] = Span('placeholder', new_text_pos, len(edit['new'])) - new_text_pos += len(edit['new']) + new_text += edit["new"] + edit["span"] = Span("placeholder", new_text_pos, len(edit["new"])) + new_text_pos += len(edit["new"]) spans = apply_edits(spans, edits) new_src_doc = SourceDoc(user=current_user, text=new_text) @@ -448,70 +532,78 @@ def save_xanaedit(username, hashid): session.commit() for span in spans: - if span.url == 'placeholder': + if span.url == "placeholder": span.url = new_src_doc.external_url - new_edl = ''.join(f'span: {span.for_edl()}\n' for span in spans) + new_edl = "".join(f"span: {span.for_edl()}\n" for span in spans) page.text = new_edl session.commit() page.update_references() - flash('Edits saved.') + flask.flash("Edits saved.") return page -@bp.route('///finish', methods=['POST']) -def finish_xanaedit(username, hashid): +@bp.route("///finish", methods=["POST"]) +def finish_xanaedit(username: str, hashid: str) -> Response: page = save_xanaedit(username, hashid) - return redirect(page.url) + return flask.redirect(page.url) -@bp.route('///edit', methods=['GET', 'POST']) + +@bp.route("///edit", methods=["GET", "POST"]) def edit_item(username, hashid): obj = get_item(username, hashid) form = SourceDocForm(obj=obj) if form.validate_on_submit(): form.populate_obj(obj) session.commit() - if obj.type == 'xanapage': + if obj.type == "xanapage": obj.update_references() - flash('Changes to {} saved.'.format(obj.type)) - return redirect(obj.url) - return render_template('edit.html', form=form, doc=obj) + flask.flash("Changes to {} saved.".format(obj.type)) + return flask.redirect(obj.url) + return flask.render_template("edit.html", form=form, doc=obj) -@bp.route('/source_doc_text/') + +@bp.route("/source_doc_text/") def source_doc_text(source_doc_id): doc = SourceDoc.query.get(source_doc_id) - return render_template('source_doc_text.html', doc=doc, iter_lines=iter_lines) + return flask.render_template("source_doc_text.html", doc=doc, iter_lines=iter_lines) -@bp.route('/settings/account', methods=['GET', 'POST']) + +@bp.route("/settings/account", methods=["GET", "POST"]) @login_required -def account_settings(): +def account_settings() -> str | Response: + """Account settings.""" form = AccountSettingsForm(obj=current_user) if form.validate_on_submit(): form.populate_obj(current_user) session.commit() - flash('Account details updated.') - return redirect(url_for(request.endpoint)) - return render_template('user/account.html', form=form) + flask.flash("Account details updated.") + assert flask.request.endpoint + return flask.redirect(flask.url_for(flask.request.endpoint)) + return flask.render_template("user/account.html", form=form) -@bp.route('/new/sourcedoc', methods=['GET', 'POST']) + +@bp.route("/new/sourcedoc", methods=["GET", "POST"]) @login_required -def new_sourcedoc(): +def new_sourcedoc() -> str | Response: + """Add a new sourcedoc.""" form = SourceDocForm() if form.validate_on_submit(): doc = SourceDoc(user=current_user) form.populate_obj(doc) session.add(doc) session.commit() - flash('New document saved.') - return redirect(doc.url) - return render_template('new.html', form=form, item_type='source document') + flask.flash("New document saved.") + return flask.redirect(doc.url) + return flask.render_template("new.html", form=form, item_type="source document") -@bp.route('/new/xanalink/raw', methods=['GET', 'POST']) + +@bp.route("/new/xanalink/raw", methods=["GET", "POST"]) @login_required def new_xanalink_raw(): form = ItemForm() @@ -520,72 +612,80 @@ def new_xanalink_raw(): form.populate_obj(obj) session.add(obj) session.commit() - flash('New xanalink saved.') - return redirect(obj.url) - return render_template('new.html', form=form, item_type='xanalink') + flask.flash("New xanalink saved.") + return flask.redirect(obj.url) + return flask.render_template("new.html", form=form, item_type="xanalink") -@bp.route('/new/xanalink', methods=['GET', 'POST']) + +@bp.route("/new/xanalink", methods=["GET", "POST"]) @login_required -def new_xanalink(): - if request.method != 'POST': - return render_template('new_xanalink.html') +def new_xanalink() -> str | Response: + """Add a new xanalink.""" + if flask.request.method != "POST": + return flask.render_template("new_xanalink.html") - data = request.get_json() - lines = ['type=' + data['link_type']] - for facet in data['facets']: - lines += ['facet='] + facet - text = ''.join(line + '\n' for line in lines) + data = flask.request.get_json() + lines = ["type=" + data["link_type"]] + for facet in data["facets"]: + lines += ["facet="] + facet + text = "".join(line + "\n" for line in lines) obj = XanaLink(user=current_user, text=text) session.add(obj) session.commit() - flash('New xanalink saved.') - return jsonify(url=obj.url) + flask.flash("New xanalink saved.") + return flask.jsonify(url=obj.url) -@bp.route('/new/xanapage', methods=['GET', 'POST']) + +@bp.route("/new/xanapage", methods=["GET", "POST"]) @login_required -def new_xanapage(): +def new_xanapage() -> str | Response: + """Start a new xanapage.""" form = ItemForm() if form.validate_on_submit(): obj = XanaPage(user=current_user) form.populate_obj(obj) session.add(obj) session.commit() - flash('New xanapage saved.') - return redirect(obj.url) - return render_template('new.html', form=form, item_type='xanapage') + flask.flash("New xanapage saved.") + return flask.redirect(obj.url) + return flask.render_template("new.html", form=form, item_type="xanapage") -@bp.route('/edit/', methods=['GET', 'POST']) + +@bp.route("/edit/", methods=["GET", "POST"]) @login_required -def edit_source_document(filename): +def edit_source_document(filename: str) -> str | Response: + """Edit a source document.""" doc = get_source_doc(current_user.username, filename) form = SourceDocForm(obj=doc) if form.validate_on_submit(): form.populate_obj(doc) session.add(doc) session.commit() - flash('Changes to document saved.') - return redirect(doc.url) - return render_template('edit.html', form=form, doc=doc) + flask.flash("Changes to document saved.") + return flask.redirect(doc.url) + return flask.render_template("edit.html", form=form, doc=doc) -@bp.route('/api/1/get//') -def api_get_document(username, filename): + +@bp.route("/api/1/get//") +def api_get_document(username: str, filename: str) -> Response: doc = get_source_doc(username, filename) - if not doc: - return abort(404) + assert doc.text is not None ret = { - 'username': username, - 'filename': filename, - 'character_count': len(doc.text), - 'document_price': str(doc.document_price), - 'price_per_character': str(doc.price_per_character), + "username": username, + "filename": filename, + "character_count": len(doc.text), + "document_price": str(doc.document_price), + "price_per_character": str(doc.price_per_character), } - return jsonify(ret) + return flask.jsonify(ret) -@bp.route('/get_span.json') -def get_span(): - url = request.args['url'] - start = int(request.args['start']) - length = int(request.args['length']) - spanid = request.args['spanid'] + +@bp.route("/get_span.json") +def get_span() -> Response: + """Return JSON representing a span.""" + url = flask.request.args["url"] + start = int(flask.request.args["start"]) + length = int(flask.request.args["length"]) + spanid = flask.request.args["spanid"] text = get_url(url) - return jsonify(text=text[start:start + length], spanid=spanid) + return flask.jsonify(text=text[start : start + length], spanid=spanid)