diff --git a/run.py b/run.py index b3736fd..de5dd14 100755 --- a/run.py +++ b/run.py @@ -3,5 +3,5 @@ from sourcing import create_app if __name__ == "__main__": - app = create_app("config.default") - app.run("0.0.0.0", debug=True) + app = create_app('config.default') + app.run('0.0.0.0', debug=True) diff --git a/sourcing/__init__.py b/sourcing/__init__.py index a8694de..aa2e424 100644 --- a/sourcing/__init__.py +++ b/sourcing/__init__.py @@ -1,14 +1,12 @@ from flask import Flask - -from . import database, view +from . import database +from . import view from .utils import display_datetime - -def create_app(config: str) -> Flask: - """Create the application.""" +def create_app(config): app = Flask(__name__) app.config.from_object(config) database.init_app(app) view.init_app(app) - app.jinja_env.filters["datetime"] = display_datetime + app.jinja_env.filters['datetime'] = display_datetime return app diff --git a/sourcing/cli.py b/sourcing/cli.py index b5dde45..60a53b9 100644 --- a/sourcing/cli.py +++ b/sourcing/cli.py @@ -1,50 +1,43 @@ +from . import create_app, model, database, edl from pprint import pprint - import click -from . import create_app, database, edl, model - -app = create_app("config.default") - +app = create_app('config.default') @app.cli.command() -@click.argument("user_or_email") -@click.option("--password", prompt=True, hide_input=True) +@click.argument('user_or_email') +@click.option('--password', prompt=True, hide_input=True) def reset_password(user_or_email, password): user = model.User.lookup_user_or_email(user_or_email) user.set_password(password) database.session.commit() - print(f"password updated for {user.username} ({user.email})") - + print(f'password updated for {user.username} ({user.email})') @app.cli.command() -@click.argument("hashid") +@click.argument('hashid') def parse_link(hashid): - home = "http://localhost:5000/" + home = 'http://localhost:5000/' item = model.Item.get_by_hashid(hashid) pprint(item.parse()) print(item.item_and_title(home)) - @app.cli.command() def all_titles(): - home = "http://localhost:5000/" + home = 'http://localhost:5000/' titles = model.XanaLink.get_all_titles(home=home) print(titles.values()) - @app.cli.command() -@click.argument("hashid") +@click.argument('hashid') def delete_item(hashid): item = model.Item.get_by_hashid(hashid) database.session.delete(item) database.session.commit() - @app.cli.command() def populate_references(): - home = "http://localhost:5000/" + home = 'http://localhost:5000/' seen = set() for ref in model.Reference.query: @@ -52,14 +45,14 @@ def populate_references(): for link_obj in model.XanaLink.query: link = link_obj.parse() - for items in link["facets"]: + for items in link['facets']: for i in items: - k, _, v = i.partition(": ") - if k == "span" and "," in v: - v = v.partition(",")[0] + k, _, v = i.partition(': ') + if k == 'span' and ',' in v: + v = v.partition(',')[0] item = model.Item.from_external(v, home=home) if item: - print(link_obj.id, "->", item.id) + print(link_obj.id, '->', item.id) as_tuple = (link_obj.id, item.id) if as_tuple in seen: continue @@ -69,13 +62,13 @@ def populate_references(): for xanapage in model.XanaPage.query: doc_edl = edl.parse_edl(xanapage.text) - if "spans" not in doc_edl or not doc_edl["spans"]: + if 'spans' not in doc_edl or not doc_edl['spans']: continue - for url, start, length in doc_edl["spans"]: + for url, start, length in doc_edl['spans']: src_doc = model.Item.from_external(url, home=home) if not src_doc.id: continue - print(xanapage.id, "->", src_doc.id) + print(xanapage.id, '->', src_doc.id) as_tuple = (xanapage.id, src_doc.id) if as_tuple in seen: continue @@ -85,11 +78,10 @@ def populate_references(): database.session.commit() - @app.cli.command() -@click.argument("hashid") +@click.argument('hashid') def show_references(hashid): item = model.Item.get_by_hashid(hashid) - print("item_id:", item.id) - print("subjects:", [i.id for i in item.subjects]) - print("objects:", [i.id for i in item.objects]) + print('item_id:', item.id) + print('subjects:', [i.id for i in item.subjects]) + print('objects:', [i.id for i in item.objects]) diff --git a/sourcing/database.py b/sourcing/database.py index 8ff6c36..fe92862 100644 --- a/sourcing/database.py +++ b/sourcing/database.py @@ -1,28 +1,18 @@ -"""Database.""" - -import flask -import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker session = scoped_session(sessionmaker()) - -def init_db(db_url: str) -> None: - """Initialise databsae.""" +def init_db(db_url): session.configure(bind=get_engine(db_url)) +def get_engine(db_url): + return create_engine(db_url, pool_recycle=3600) -def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: - """Create an engine object.""" - return create_engine(db_url, echo=False, pool_recycle=3600) - - -def init_app(app: flask.app.Flask) -> None: - """Initialise database connection within flask app.""" - db_url = app.config["DB_URL"] +def init_app(app): + db_url = app.config['DB_URL'] session.configure(bind=get_engine(db_url)) @app.teardown_appcontext - def shutdown_session(exception: Exception | None = None) -> None: + def shutdown_session(exception=None): session.remove() diff --git a/sourcing/edit.py b/sourcing/edit.py index 0fb7abf..5f6cf1d 100644 --- a/sourcing/edit.py +++ b/sourcing/edit.py @@ -1,43 +1,25 @@ -"""Edit.""" - -import typing - import attr -from .span import Span - - class EditOutOfRange(Exception): - """Edit out of range.""" + pass - -class Edit(typing.TypedDict): - """Edit.""" - - start: int - old: str - span: Span - op: str - - -def apply_delete(current_spans: list[Span], edit: Edit) -> list[Span]: - """Apply delete.""" +def apply_delete(current_spans, edit): assert edit if not current_spans: - raise ValueError("edit is out of bounds") + raise ValueError('edit is out of bounds') spans = [] pos = 0 - edit_end = edit["start"] + len(edit["old"]) + edit_end = edit['start'] + len(edit['old']) cur_span = current_spans.pop(0) - while pos + cur_span.length < edit["start"]: + while pos + cur_span.length < edit['start']: spans.append(cur_span) pos += cur_span.length cur_span = current_spans.pop(0) - if edit["start"] > pos: - new_span = attr.evolve(cur_span, length=edit["start"] - pos) + if edit['start'] > pos: + new_span = attr.evolve(cur_span, length=edit['start'] - pos) spans.append(new_span) while pos + cur_span.length < edit_end: @@ -48,27 +30,28 @@ def apply_delete(current_spans: list[Span], edit: Edit) -> list[Span]: offset = cur_span.start - pos new_start = offset + (edit_end - pos) diff = new_start - cur_span.start - new_span = attr.evolve(cur_span, length=cur_span.length - diff, start=new_start) + new_span = attr.evolve(cur_span, + length=cur_span.length - diff, + start=new_start) spans.append(new_span) spans += current_spans return spans - -def apply_insert(current_spans: list[Span], edit: Edit) -> Span | list[Span]: - if not current_spans and edit["0"] == 0: - return edit["span"] +def apply_insert(current_spans, edit): + if not current_spans and edit['0'] == 0: + return edit['span'] pos = 0 spans = [] cur_span = current_spans.pop(0) - while pos + cur_span.length < edit["start"]: + while pos + cur_span.length < edit['start']: spans.append(cur_span) pos += cur_span.length cur_span = current_spans.pop(0) - if edit["start"] >= pos: - length_a = edit["start"] - pos + if edit['start'] >= pos: + length_a = edit['start'] - pos length_b = cur_span.length - length_a if length_a: @@ -76,34 +59,35 @@ def apply_insert(current_spans: list[Span], edit: Edit) -> Span | list[Span]: pos += length_a spans.append(span_a) - spans.append(edit["span"]) - pos += edit["span"].length + spans.append(edit['span']) + pos += edit['span'].length if length_b: - span_b = attr.evolve( - cur_span, start=cur_span.start + length_a, length=length_b - ) + span_b = attr.evolve(cur_span, + start=cur_span.start + length_a, + length=length_b) spans.append(span_b) pos += length_b else: - spans.append(edit["span"]) + spans.append(edit['span']) spans += current_spans return spans - -def apply_edits(spans: list[Span], edits: list[Edit]) -> list[Span]: +def apply_edits(spans, edits): for edit in edits: - if edit["op"] == "delete": + if edit['op'] == 'delete': spans = apply_delete(spans, edit) continue - if edit["op"] == "insert": + if edit['op'] == 'insert': spans = apply_insert(spans, edit) continue - if edit["op"] == "replace": + if edit['op'] == 'replace': spans = apply_delete(spans, edit) spans = apply_insert(spans, edit) continue return spans + + diff --git a/sourcing/edl.py b/sourcing/edl.py index 9ec0a6b..a3a8b7f 100644 --- a/sourcing/edl.py +++ b/sourcing/edl.py @@ -1,126 +1,87 @@ -"""Edit decision list.""" - -import re -import typing +from .url import get_url, get_text +from .parse import get_span, parse_span, parse_link, parse_sourcedoc_facet, xanapage_span_html, span_html, get_urls from collections import defaultdict from html import escape -from pprint import pprint - -from .parse import ( - SourceText, - SpanContents, - get_span, - get_urls, - parse_link, - parse_sourcedoc_facet, - parse_span, - span_html, - xanapage_span_html, -) -from .url import get_text, get_url from .utils import protect_start_spaces -re_comment = re.compile(r"#.*") -re_xanalink = re.compile("xanalink: +([^ ]+) *$") +import re + +re_comment = re.compile(r'#.*') +re_xanalink = re.compile('xanalink: +([^ ]+) *$') max_sourcedoc_size = 600000 - -def fulfil_edl(edl: str) -> typing.Iterator[SpanContents]: - """Yeild each span of an EDL.""" - text: SourceText = {} - for url, start, length in parse_edl(edl)["spans"]: +def fulfil_edl(edl): + text = {} + for url, start, length in parse_edl(edl)['spans']: if url not in text: text[url] = get_text(url) yield get_span(text, url, start, length) - -class EDLDict(typing.TypedDict): - """Dict representing an EDL.""" - - spans: list[tuple[str, int, int]] - links: list[dict[str, typing.Any]] - - -def parse_edl(edl_text: str) -> EDLDict: - """Parse an EDL and return contents.""" - edl: EDLDict = { - "spans": [], - "links": [], +def parse_edl(edl_text): + edl = { + 'spans': [], + 'links': [], } for line in edl_text.splitlines(): - line = re_comment.sub("", line).strip() + line = re_comment.sub('', line).strip() if not line: continue span_pointer = parse_span(line) if span_pointer: - edl["spans"].append(span_pointer) + edl['spans'].append(span_pointer) continue m = re_xanalink.match(line) if m: link_url = m.group(1) - edl["links"].append( - { - "url": link_url, - "text": get_url(link_url), - } - ) + edl['links'].append({ + 'url': link_url, + 'text': get_url(link_url), + }) continue return edl - -def fulfil_edl_with_sources( - edl_text: str, links=None, hide_all_transclusions: bool = False -): +def fulfil_edl_with_sources(edl_text, links=None, hide_all_transclusions=False): edl = parse_edl(edl_text) - return fulfil_edl_with_links( - edl, links=links, hide_all_transclusions=hide_all_transclusions - ) + return fulfil_edl_with_links(edl, + links=links, + hide_all_transclusions=hide_all_transclusions) - -def fulfil_edl_with_links( - edl: EDLDict, - doc_num: int | str = "", - links: list[dict[str, str]] | None = None, - hide_all_transclusions: bool = False, -): - spans = edl["spans"] +def fulfil_edl_with_links(edl, doc_num='', links=None, hide_all_transclusions=False): + spans = edl['spans'] hide_transclusions = set() two_facet_links = [] if not links: - links = [parse_link(link["text"]) for link in edl["links"]] + links = [parse_link(link['text']) for link in edl['links']] link_num = 0 for link in links: - if link["type"] == "HideTransclusions": - hide_transclusions.add(parse_sourcedoc_facet(link["facets"][0])) - elif len(link["facets"]) == 2: - two_facet_links.append( - (link_num, [parse_span(span[0]) for span in link["facets"]]) - ) + if link['type'] == 'HideTransclusions': + hide_transclusions.add(parse_sourcedoc_facet(link['facets'][0])) + elif len(link['facets']) == 2: + two_facet_links.append((link_num, [parse_span(span[0]) for span in link['facets']])) link_num += 1 source = [get_text(url) for url in get_urls(spans)] - source_text = {s["url"]: s["text"] for s in source} + source_text = {s['url']: s['text'] for s in source} source_doc_links = defaultdict(list) for link_num, facets in two_facet_links: for facet_num, span in enumerate(facets): - assert span url, start, length = span - source_doc_links[url].append((start, length, link_num, "link", facet_num)) + source_doc_links[url].append((start, length, link_num, 'link', facet_num)) if url in source_text: continue s = get_text(url) source.append(s) - source_text[s["url"]] = s["text"] + source_text[s['url']] = s['text'] for s in source_doc_links.values(): s.sort() @@ -131,67 +92,28 @@ def fulfil_edl_with_links( for num, (url, start, length) in spans: highlight = not hide_all_transclusions and url not in hide_transclusions span_text = source_text[url] # [start:start + length] - new_text = "" + new_text = '' pos = start - for link_start, link_len, link_num, span_type, facet_num in source_doc_links[ - url - ]: + for link_start, link_len, link_num, span_type, facet_num in source_doc_links[url]: link_end = link_start + link_len if link_start >= start + length: break if link_end < start: continue - cls = "xanapagelink link" - link_span = ( - f'' - + escape(span_text[link_start:link_end]) - + "" - ) + cls = 'xanapagelink link' + link_span = (f'' + + escape(span_text[link_start:link_end]) + + '') new_text += escape(span_text[pos:link_start]) + link_span pos = link_end - new_text += escape(span_text[pos : start + length]) + new_text += escape(span_text[pos:start + length]) cur = xanapage_span_html(num, new_text, url, start, length, highlight=highlight) doc_spans.append(cur) - doc = "".join(doc_spans) - - for s in source: - text = protect_start_spaces(s.pop("text")) - if s["length"] > max_sourcedoc_size: - # print('{} > {}'.format(s['length'], max_sourcedoc_size)) - continue - if s["url"] in hide_transclusions: - continue - source_spans = [ - (start, length, num, "transclusion", 0) - for num, (url, start, length) in spans - if url == s["url"] - ] - source_spans += source_doc_links[s["url"]] - source_spans.sort() - - new_text = "" - pos = 0 - - pprint(source_spans) - - for start, length, num, span_type, _ in source_spans: - end = start + length - new_text += ( - escape(text[pos:start]) - + span_html(span_type, num) - + escape(text[start:end]) - + "" - ) - pos = end - new_text += escape(text[pos:]) - new_text = new_text.replace("\n", "\n") - - s["text"] = new_text + doc = ''.join(doc_spans) return { - "source": source, - "doc": doc.replace("\n", "\n"), - "span_count": len(spans), - "link_count": len(two_facet_links), + 'doc': doc.replace('\n', '\n'), + 'span_count': len(spans), + 'link_count': len(two_facet_links), } diff --git a/sourcing/forms.py b/sourcing/forms.py index c6c1c40..2515ec5 100644 --- a/sourcing/forms.py +++ b/sourcing/forms.py @@ -1,135 +1,82 @@ from flask_wtf import FlaskForm -from wtforms.fields import ( - BooleanField, - FileField, - HiddenField, - IntegerField, - PasswordField, - StringField, - TextAreaField, -) -from wtforms.validators import ( - Email, - InputRequired, - Length, - NoneOf, - Optional, - Regexp, - ValidationError, -) - -from .model import LoginError, User, re_username, reserved_name, user_exists +from wtforms.fields import StringField, PasswordField, BooleanField, HiddenField, TextAreaField, FileField, IntegerField +from wtforms.validators import InputRequired, Email, Length, ValidationError, Regexp, NoneOf, Optional +from .model import User, LoginError, re_username, reserved_name, user_exists PASSWORD_LEN = 64 EMAIL_LEN = 64 - class SignupForm(FlaskForm): - """Signup form.""" - - username = StringField( - "username", - [ - InputRequired(), - Regexp(re_username), - NoneOf(reserved_name, message="Not available."), - Length(min=3, max=64), - ], - [lambda name: name and name.replace(" ", "_")], - ) - email = StringField( - "e-mail address", - [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)], - description="we never share your e-mail address", - ) - password = StringField( - "password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)] - ) + username = StringField('username', + [InputRequired(), + Regexp(re_username), + NoneOf(reserved_name, message='Not available.'), + Length(min=3, max=64)], + [lambda name: name and name.replace(' ', '_')]) + email = StringField('e-mail address', + [InputRequired(), Email(), + Length(min=5, max=EMAIL_LEN)], + description="we never share your e-mail address") + password = StringField('password', + [InputRequired(), Length(min=4, max=PASSWORD_LEN)]) def validate_username(form, field): if user_exists(User.username, field.data): - raise ValidationError("Not available") + raise ValidationError('Not available') def validate_email(form, field): if user_exists(User.email, field.data): - raise ValidationError("In use by another account") - + raise ValidationError('In use by another account') class LoginForm(FlaskForm): - """Login form.""" + user_or_email = StringField('username or e-mail address', + [InputRequired(), Length(min=3, max=EMAIL_LEN)], + [lambda name: name and name.replace(' ', '_')]) + password = PasswordField('password', + [InputRequired(), Length(max=PASSWORD_LEN)]) + remember = BooleanField('stay logged in') + next = HiddenField('next') - user_or_email = StringField( - "username or e-mail address", - [InputRequired(), Length(min=3, max=EMAIL_LEN)], - [lambda name: name and name.replace(" ", "_")], - ) - password = PasswordField("password", [InputRequired(), Length(max=PASSWORD_LEN)]) - remember = BooleanField("stay logged in") - next = HiddenField("next") - - def validate(self) -> bool: - """Validate.""" + def validate(self): rv = FlaskForm.validate(self) if not rv: return False try: - self.user = User.attempt_login(self.user_or_email.data, self.password.data) + self.user = User.attempt_login(self.user_or_email.data, + self.password.data) return True except LoginError as e: self.user_or_email.errors.append(e.msg) return False - class ForgotPasswordForm(FlaskForm): - """Forgot password form.""" - - user_or_email = StringField( - "username or e-mail address", [InputRequired(), Length(max=EMAIL_LEN)] - ) - + user_or_email = StringField('username or e-mail address', + [InputRequired(), Length(max=EMAIL_LEN)]) class PasswordForm(FlaskForm): - """Password form.""" - - password = PasswordField( - "new password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)] - ) - + password = PasswordField('new password', + [InputRequired(), Length(min=4, max=PASSWORD_LEN)]) class AccountSettingsForm(FlaskForm): - """Account settings form.""" - - full_name = StringField("full name", [Length(max=64)]) - email = StringField( - "e-mail address", [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)] - ) - + full_name = StringField('full name', [Length(max=64)]) + email = StringField('e-mail address', + [InputRequired(), Email(), + Length(min=5, max=EMAIL_LEN)]) class ChangePasswordForm(FlaskForm): - """Change password form.""" - - old_password = PasswordField( - "current password", [InputRequired(), Length(max=PASSWORD_LEN)] - ) - new_password = PasswordField( - "new password", [InputRequired(), Length(max=PASSWORD_LEN)] - ) - + old_password = PasswordField('current password', + [InputRequired(), Length(max=PASSWORD_LEN)]) + new_password = PasswordField('new password', + [InputRequired(), Length(max=PASSWORD_LEN)]) class SourceDocForm(FlaskForm): - """Source doc form.""" - - text = TextAreaField("text", [InputRequired()]) - db_price_per_character = IntegerField("price per character", [Optional()]) - db_document_price = IntegerField("document price", [Optional()]) - + text = TextAreaField('text', [InputRequired()]) + db_price_per_character = IntegerField('price per character', [Optional()]) + db_document_price = IntegerField('document price', [Optional()]) class ItemForm(FlaskForm): - """Item form.""" - - text = TextAreaField("text", [InputRequired()]) - + text = TextAreaField('text', [InputRequired()]) class UploadSourceDocForm(FlaskForm): - sourcedoc_file = FileField("SourceDoc", [Regexp(r"^[^/\\]+\.txt$")]) + sourcedoc_file = FileField('SourceDoc', [Regexp(r'^[^/\\]+\.txt$')]) diff --git a/sourcing/mail.py b/sourcing/mail.py index f99ca77..77a96c6 100644 --- a/sourcing/mail.py +++ b/sourcing/mail.py @@ -1,49 +1,40 @@ -"""Send email.""" - -import smtplib -from email import charset +from flask import render_template, current_app from email.mime.text import MIMEText -from email.utils import formataddr, formatdate, make_msgid +from email.utils import formatdate, make_msgid +from email import charset +from email.utils import formataddr +import smtplib -from flask import current_app, render_template +charset.add_charset('utf-8', charset.SHORTEST, charset.QP) -from .model import User +def format_message(user, subject, body): + from_name = current_app.config['FROM_NAME'] + from_addr = current_app.config['FROM_ADDR'] -charset.add_charset("utf-8", charset.SHORTEST, charset.QP) - - -def format_message(user: User, subject: str, body: str) -> MIMEText: - """Format an email.""" - from_name = current_app.config["FROM_NAME"] - from_addr = current_app.config["FROM_ADDR"] - - msg = MIMEText(body, "plain", "UTF-8") - msg["Subject"] = subject - msg["To"] = formataddr((user.mail_to_name, user.email)) - msg["From"] = formataddr((from_name, from_addr)) - msg["Date"] = formatdate() - msg["Message-ID"] = make_msgid() + msg = MIMEText(body, 'plain', 'UTF-8') + msg['Subject'] = subject + msg['To'] = formataddr((user.mail_to_name, user.email)) + msg['From'] = formataddr((from_name, from_addr)) + msg['Date'] = formatdate() + msg['Message-ID'] = make_msgid() return msg - -def send_mail(user: User, subject: str, body: str) -> None: - """Send an email.""" - bounce_addr = current_app.config["FROM_ADDR"] +def send_mail(user, subject, body): + bounce_addr = current_app.config['FROM_ADDR'] msg = format_message(user, subject, body) msg_as_string = msg.as_string() - if not current_app.config["REALLY_SEND_MAIL"]: # during development + if not current_app.config['REALLY_SEND_MAIL']: # during development return - s = smtplib.SMTP("localhost") + s = smtplib.SMTP('localhost') s.sendmail(bounce_addr, [user.email], msg_as_string) s.quit() - -def send_signup_mail(user: User) -> None: - """Unused so far.""" - subject = "xanadu: verify your account" - body = render_template("mail/signup.txt", user=user) +def send_signup_mail(user): + ''' unused so far ''' + subject = u'xanadu: verify your account' + body = render_template('mail/signup.txt', user=user) send_mail(user, subject, body) diff --git a/sourcing/model.py b/sourcing/model.py index ed3eb06..09703dc 100644 --- a/sourcing/model.py +++ b/sourcing/model.py @@ -1,34 +1,20 @@ -"""Models.""" - -from __future__ import annotations - -import re -import typing - -from flask import current_app, url_for -from flask_login import UserMixin -from hashids import Hashids -from sqlalchemy import Column, ForeignKey, func -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy.orm import configure_mappers, relationship, synonym, validates -from sqlalchemy.sql import exists -from sqlalchemy.types import ( - Boolean, - DateTime, - Enum, - Integer, - String, - Unicode, - UnicodeText, -) -from sqlalchemy_continuum import make_versioned -from sqlalchemy_continuum.plugins import ActivityPlugin, FlaskPlugin -from werkzeug.security import check_password_hash, generate_password_hash - +from flask import url_for, current_app from .database import session -from .parse import ParsedLink, SpanTuple, parse_link, parse_span +from .parse import parse_link, parse_sourcedoc_facet, parse_span from .text import first_non_empty_line +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, ForeignKey +from sqlalchemy.types import String, Unicode, Integer, DateTime, Boolean, UnicodeText, Enum +from sqlalchemy import func +from sqlalchemy.orm import relationship, validates, synonym, configure_mappers +from sqlalchemy.sql import exists +from flask_login import UserMixin +from werkzeug.security import generate_password_hash, check_password_hash +from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy_continuum import make_versioned +from sqlalchemy_continuum.plugins import FlaskPlugin, ActivityPlugin +import re +from hashids import Hashids activity_plugin = ActivityPlugin() make_versioned(plugins=[FlaskPlugin(), activity_plugin]) @@ -38,79 +24,38 @@ doc_hashids = Hashids(min_length=8) Base = declarative_base() Base.query = session.query_property() -re_server_url = re.compile(r"^http://perma.pub/\d+/([^/]+)/([^/]+)$") +re_server_url = re.compile(r'^http://perma.pub/\d+/([^/]+)/([^/]+)$') # list of disallowed usernames - maybe this should be in the database -reserved_name = [ - "root", - "admin", - "administrator", - "support", - "info", - "test", - "tech", - "online", - "old", - "new", - "jobs", - "login", - "job", - "ipad" "iphone", - "javascript", - "script", - "host", - "mail", - "image", - "faq", - "file", - "ftp", - "error", - "warning", - "the", - "assistance", - "maintenance", - "controller", - "head", - "chief", - "anon", -] +reserved_name = ['root', 'admin', 'administrator', 'support', 'info', + 'test', 'tech', 'online', 'old', 'new', 'jobs', 'login', 'job', 'ipad' + 'iphone', 'javascript', 'script', 'host', 'mail', 'image', 'faq', + 'file', 'ftp', 'error', 'warning', 'the', 'assistance', 'maintenance', + 'controller', 'head', 'chief', 'anon'] -re_username = re.compile(r"^\w+$", re.U) +re_username = re.compile(r'^\w+$', re.U) re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U) -re_comment = re.compile(r"#.*") - -AllTitles = dict["Item", str] - def item_url(): - return url_for("view.view_item", username=self.user.username, hashid=self.hashid) + return url_for('view.view_item', + username=self.user.username, + hashid=self.hashid) -def user_exists(field: Column[str], value: str) -> bool: - """Check that user exists.""" - return typing.cast(bool, session.query(exists().where(field == value)).scalar()) - +def user_exists(field, value): + return session.query(exists().where(field == value)).scalar() class TimeStampedModel(Base): - """Time stamped model.""" - __abstract__ = True created = Column(DateTime, default=func.now()) modified = Column(DateTime, default=func.now(), onupdate=func.now()) - class LoginError(Exception): - """Login error.""" - - def __init__(self, msg: str): - """Init.""" + def __init__(self, msg): self.msg = msg - class User(TimeStampedModel, UserMixin): - """User model.""" - - __tablename__ = "user" + __tablename__ = 'user' id = Column(Integer, primary_key=True) username = Column(Unicode(32), unique=True, nullable=False) pw_hash = Column(String(160), nullable=False) @@ -123,358 +68,269 @@ class User(TimeStampedModel, UserMixin): full_name = Column(Unicode(64)) balance = Column(Integer, nullable=False, default=0) - user_id = synonym("id") - name = synonym("full_name") - user_name = synonym("username") + user_id = synonym('id') + name = synonym('full_name') + user_name = synonym('username') def __init__(self, **kwargs): - """Init.""" - pw_hash = generate_password_hash(kwargs.pop("password")) + pw_hash = generate_password_hash(kwargs.pop('password')) return super(User, self).__init__(pw_hash=pw_hash, **kwargs) - def __repr__(self) -> str: - """Repr.""" - return "".format(self.username) + def __repr__(self): + return ''.format(self.username) - def set_password(self, password: str) -> None: - """Set password.""" + def set_password(self, password): self.pw_hash = generate_password_hash(password) - def check_password(self, password: str) -> bool: - """Check password.""" + def check_password(self, password): return check_password_hash(self.pw_hash, password) - def get_id(self) -> int: - """Get ID.""" + def get_id(self): return self.id - @validates("email") - def validate_email(self, key, value: str) -> str: - """Validate email.""" - assert "@" in value + @validates('email') + def validate_email(self, key, value): + assert '@' in value return value - @validates("username") + @validates('username') def validate_usernane(self, key, value): - """Validate username.""" assert re_username.match(value) return value - @validates("full_name") + @validates('full_name') def validate_full_name(self, key, value): if value: assert re_full_name.match(value) return value @hybrid_property - def is_live(self) -> bool: - """User account is live.""" - return bool(self.email_verified & ~self.disabled & ~self.deleted) + def is_live(self): + return self.email_verified & ~self.disabled & ~self.deleted @classmethod - def lookup_user_or_email(cls, user_or_email: str) -> User: - """Lookup user or email.""" - field = cls.email if "@" in user_or_email else cls.username - return typing.cast(User, cls.query.filter(field == user_or_email).one_or_none()) + def lookup_user_or_email(cls, user_or_email): + field = cls.email if '@' in user_or_email else cls.username + return cls.query.filter(field == user_or_email).one_or_none() @property - def mail_to_name(self) -> str: - """Name to use on e-mails sent to the user.""" + def mail_to_name(self): + '''Name to use on e-mails sent to the user.''' return self.full_name or self.username @classmethod - def attempt_login(cls, user_or_email: str, password: str) -> User: - """Attempt login.""" + def attempt_login(cls, user_or_email, password): user = cls.lookup_user_or_email(user_or_email) if not user: - raise LoginError("user not found") + raise LoginError('user not found') if user.disabled: - raise LoginError("user account disabled") + raise LoginError('user account disabled') if not user.check_password(password): - raise LoginError("incorrect password") + raise LoginError('incorrect password') return user - class Reference(Base): - __tablename__ = "reference" - subject_id = Column(Integer, ForeignKey("item.id"), primary_key=True) - object_id = Column(Integer, ForeignKey("item.id"), primary_key=True) - + __tablename__ = 'reference' + subject_id = Column(Integer, ForeignKey('item.id'), primary_key=True) + object_id = Column(Integer, ForeignKey('item.id'), primary_key=True) class Item(TimeStampedModel): - __tablename__ = "item" - __versioned__ = {"base_classes": (TimeStampedModel,)} + __tablename__ = 'item' + __versioned__ = {'base_classes': (TimeStampedModel,)} id = Column(Integer, primary_key=True) - user_id = Column(Integer, ForeignKey("user.id")) + user_id = Column(Integer, ForeignKey('user.id')) published = Column(DateTime) - type = Column( - Enum("sourcedoc", "xanapage", "xanalink", name="item_type"), nullable=False - ) + type = Column(Enum('sourcedoc', 'xanapage', 'xanalink', name='item_type'), + nullable=False) filename = Column(Unicode) text = Column(UnicodeText) - subjects = relationship( - "Item", - lazy="dynamic", - secondary="reference", - primaryjoin=id == Reference.object_id, - secondaryjoin=id == Reference.subject_id, - ) - objects = relationship( - "Item", - lazy="dynamic", - secondary="reference", - primaryjoin=id == Reference.subject_id, - secondaryjoin=id == Reference.object_id, - ) - user = relationship("User", backref="items", lazy="select") + subjects = relationship('Item', + lazy='dynamic', + secondary='reference', + primaryjoin=id == Reference.object_id, + secondaryjoin=id == Reference.subject_id) + objects = relationship('Item', + lazy='dynamic', + secondary='reference', + primaryjoin=id == Reference.subject_id, + secondaryjoin=id == Reference.object_id) + user = relationship('User', backref='items') __mapper_args__ = { - "polymorphic_on": type, - "with_polymorphic": "*", + 'polymorphic_on': type, + 'with_polymorphic': '*', } @property - def hashid(self) -> str: - """Hashid for item.""" - return typing.cast(str, doc_hashids.encode(self.id)) + def hashid(self): + return doc_hashids.encode(self.id) @classmethod - def get_by_hashid(cls, hashid: str) -> Item | None: - """Return the item with the given hashid.""" + def get_by_hashid(cls, hashid): try: item_id = doc_hashids.decode(hashid)[0] except IndexError: - return None - return typing.cast("Item", cls.query.get(item_id)) + return + return cls.query.get(item_id) - def view_url(self, endpoint: str, **kwargs) -> str: - return url_for( - "view." + endpoint, - username=self.user.username, - hashid=self.hashid, - **kwargs, - ) + def view_url(self, endpoint, **kwargs): + return url_for('view.' + endpoint, + username=self.user.username, + hashid=self.hashid, + **kwargs) @property - def url(self) -> str: - """URL for to view this item.""" - return self.view_url("view_item") + def url(self): + return self.view_url('view_item') - def url_fragment(self) -> str: - """URL fragment.""" - return self.user.username + "/" + self.hashid + def url_fragment(self): + return self.user.username + '/' + self.hashid - def version_url(self, version: int) -> str: - """URL for version.""" - return self.view_url("view_item", v=version) + def version_url(self, version): + return self.view_url('view_item', v=version) @property - def history_url(self) -> str: - """History URL.""" - return self.view_url("history") + def history_url(self): + return self.view_url('history') @property - def external_url(self) -> str: - """External URL.""" - base_url = current_app.config.get("BASE_URL") - assert base_url and isinstance(base_url, str) - if not base_url.endswith("/"): - base_url += "/" + def external_url(self): + base_url = current_app.config.get('BASE_URL') + if not base_url.endswith('/'): + base_url += '/' if base_url: return base_url + self.url_fragment() else: - return self.view_url("view_item", _external=True) + return self.view_url('view_item', _external=True) @property - def edit_url(self) -> str: - """Edit URL.""" - return self.view_url("edit_item") + def edit_url(self): + return self.view_url('edit_item') @property - def set_title_url(self) -> str: - """Set title URL.""" - return self.view_url("set_title") + def set_title_url(self): + return self.view_url('set_title') - def title_from_link(self, titles: AllTitles | None = None) -> str | None: - """Get title from link.""" - if titles is None: + def title_from_link(self, titles=None): + if not titles: titles = XanaLink.get_all_titles() return titles.get(self) - def title(self, titles: AllTitles | None = None) -> str: - """Get title.""" - return self.type + ": " + (self.title_from_link(titles) or self.hashid) + def title(self, titles=None): + return self.type + ': ' + (self.title_from_link(titles) or self.hashid) - def has_title(self) -> bool: - """Item has a title.""" + def has_title(self): titles = XanaLink.get_all_titles() return self in titles - def set_title(self, title: str, user: User) -> None: - """Set item title.""" + def set_title(self, title, user): title_source_doc = SourceDoc(text=title, user=user) session.add(title_source_doc) session.commit() - link_text = """type=title + link_text = '''type=title facet= sourcedoc: {} facet= -span: {},start=0,length={}""".format( - self.external_url, title_source_doc.external_url, len(title) - ) +span: {},start=0,length={}'''.format(self.external_url, title_source_doc.external_url, len(title)) title_link = XanaLink(text=link_text, user=user) session.add(title_link) session.commit() @classmethod - def from_external(cls, url: str, home: str | None = None) -> None | Item: - """Get item from URL.""" - - username: str | None - hashid: str | None - - parts = url.split("/") - username, hashid = parts[-2:] - item_id = doc_hashids.decode(hashid)[0] - q = cls.query.filter(User.username == username, cls.id == item_id) - item = q.one_or_none() - if item: - return typing.cast(Item, item) - - base = current_app.config.get("BASE_URL") + def from_external(cls, url, home=None): + base = current_app.config.get('BASE_URL') username, hashid = None, None if home is None: - home = url_for("view.home", _external=True) + home = url_for('view.home', _external=True) if url.startswith(home): - username, _, hashid = url[len(home) :].partition("/") + username, _, hashid = url[len(home):].partition('/') elif base and url.startswith(base): - username, _, hashid = url[len(base) :].lstrip("/").partition("/") + username, _, hashid = url[len(base):].lstrip('/').partition('/') - if username and "/" in username or hashid and "/" in hashid: + if username and '/' in username or hashid and '/' in hashid: username, hashid = None, None if not username or not hashid: m = re_server_url.match(url) if not m: - return None + return username, hashid = m.groups() item_id = doc_hashids.decode(hashid)[0] q = cls.query.filter(User.username == username, cls.id == item_id) return q.one_or_none() - class XanaPage(Item): - __tablename__ = "xanapage" - __mapper_args__ = {"polymorphic_identity": "xanapage"} + __tablename__ = 'xanapage' + __mapper_args__ = {'polymorphic_identity': 'xanapage'} id = Column(Integer, ForeignKey(Item.id), primary_key=True) - def snippet(self) -> str: - """Snippet of text.""" + def snippet(self): return self.text @property - def xanaedit_url(self) -> str: - """XanaExit URL.""" - return self.view_url("xanaedit_item") + def xanaedit_url(self): + return self.view_url('xanaedit_item') @property - def save_xanaedit_url(self) -> str: - """XanaExit save URL.""" - return self.view_url("save_xanaedit") - - def iter_spans(self) -> typing.Iterator[SpanTuple]: - """Span iterator.""" - assert self.text is not None - for line in self.text.splitlines(): - line = re_comment.sub("", line).strip() - if not line: - continue - span_pointer = parse_span(line) - if span_pointer: - yield span_pointer - - def update_references(self) -> None: - """Update references.""" - for url, start, length in self.iter_spans(): - src_doc = Item.from_external(url) - if not src_doc or not src_doc.id: - continue - existing = Reference.query.get((self.id, src_doc.id)) - if existing: - continue - ref = Reference(subject_id=self.id, object_id=src_doc.id) - session.add(ref) - session.commit() - + def save_xanaedit_url(self): + return self.view_url('save_xanaedit') class XanaLink(Item): - """XanaLink.""" - - __tablename__ = "xanalink" - __mapper_args__ = {"polymorphic_identity": "xanalink"} + __tablename__ = 'xanalink' + __mapper_args__ = {'polymorphic_identity': 'xanalink'} id = Column(Integer, ForeignKey(Item.id), primary_key=True) - def parse(self) -> ParsedLink: - """Parse link.""" - assert self.text is not None + def parse(self): return parse_link(self.text) @property - def link_type(self) -> str: - """Get link type.""" - return self.parse()["type"] + def link_type(self): + return self.parse()['type'] - def title(self, titles: AllTitles | None = None) -> str: - """Title of link.""" + def title(self, titles=None): if titles is None: titles = XanaLink.get_all_titles() if self in titles: - return self.type + ": " + titles[self] + return self.type + ': ' + titles[self] parsed = self.parse() - assert isinstance(parsed["type"], str) - if parsed["type"] == "title": - ident = parsed["facets"][0][0].partition(": ")[2] + if parsed['type'] == 'title': + ident = parsed['facets'][0][0].partition(': ')[2] item = Item.from_external(ident) if item in titles: - return parsed["type"] + " link for " + item.title(titles=titles) + return parsed['type'] + " link for " + item.title(titles=titles) - if parsed["type"]: - return parsed["type"] + " link: " + self.hashid + if parsed['type']: + return parsed['type'] + " link: " + self.hashid else: return "link: " + self.hashid def item_and_title(self, home=None): link = self.parse() - if link["type"] != "title": + if link['type'] != 'title': return try: - facet1, facet2 = link["facets"] + facet1, facet2 = link['facets'] except ValueError: return - link_type, _, ident = facet1[0].partition(": ") + link_type, _, ident = facet1[0].partition(': ') item = Item.from_external(ident, home) - try: - ident2, start, length = parse_span(facet2[0]) - except TypeError: - return - + ident2, start, length = parse_span(facet2[0]) source_of_title = SourceDoc.from_external(ident2, home) if source_of_title: - return (item, source_of_title.text[start : length + start]) + return(item, source_of_title.text[start:length + start]) @classmethod - def get_all_titles(cls, home: str | None = None) -> AllTitles: - """Get all known titles.""" + def get_all_titles(cls, home=None): titles = {} for link in cls.query: ret = link.item_and_title(home) @@ -484,14 +340,12 @@ class XanaLink(Item): titles[item] = title return titles - def snippet(self) -> str | None: - """Snippet of text.""" + def snippet(self): return self.text - class SourceDoc(Item): - __tablename__ = "sourcedoc" - __mapper_args__ = {"polymorphic_identity": "sourcedoc"} + __tablename__ = 'sourcedoc' + __mapper_args__ = {'polymorphic_identity': 'sourcedoc'} id = Column(Integer, ForeignKey(Item.id), primary_key=True) db_price_per_character = Column(Integer) @@ -499,61 +353,44 @@ class SourceDoc(Item): @property def document_price(self): - assert self.text is not None return self.db_document_price or self.db_price_per_character * len(self.text) @property def price_per_character(self): - assert self.text is not None return self.db_price_per_character or self.db_document_price / len(self.text) - def snippet( - self, - length: int = 255, - killwords: bool = False, - end: str = "...", - leeway: int = 5, - ) -> str: - """Get snippet of text.""" - assert self.text is not None + def snippet(self, length=255, killwords=False, end='...', leeway=5): s = self.text - assert length >= len(end), "expected length >= %s, got %s" % (len(end), length) - assert leeway >= 0, "expected leeway >= 0, got %s" % leeway + assert length >= len(end), 'expected length >= %s, got %s' % (len(end), length) + assert leeway >= 0, 'expected leeway >= 0, got %s' % leeway if len(s) <= length + leeway: return s if killwords: - return s[: length - len(end)] + end - result = s[: length - len(end)].rsplit(" ", 1)[0] + return s[:length - len(end)] + end + result = s[:length - len(end)].rsplit(' ', 1)[0] return result + end - def raw_title(self) -> str: - """Raw title.""" + def raw_title(self): return self.title(with_type=False) - def title(self, titles: AllTitles | None = None, with_type: bool = True) -> str: - """Source document title.""" - start = self.type + ": " if with_type else "" - if titles is None: - titles = XanaLink.get_all_titles() + def title(self, titles=None, with_type=True): + start = self.type + ': ' if with_type else '' + titles = XanaLink.get_all_titles() from_link = self.title_from_link(titles=titles) if from_link: return start + from_link - assert self.text is not None first_line = first_non_empty_line(self.text) if first_line: return start + first_line return start + self.hashid @property - def create_xanapage_url(self) -> str: - """Create xanapage URL.""" - return self.view_url("create_xanapage_from_sourcedoc") + def create_xanapage_url(self): + return self.view_url('create_xanapage_from_sourcedoc') @property - def entire_span(self) -> str: - """Entire span.""" - assert self.text is not None - return self.external_url + f",start=0,length={len(self.text)}" + def entire_span(self): + return self.external_url + f',start=0,length={len(self.text)}' configure_mappers() diff --git a/sourcing/parse.py b/sourcing/parse.py index f4814b1..7421674 100644 --- a/sourcing/parse.py +++ b/sourcing/parse.py @@ -1,60 +1,34 @@ -import os.path import re -import typing +import os.path from html import escape -from .types import ExternalText - -re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)") -re_xanalink = re.compile(r"xanalink: +([^ ]+) *$") -re_facet = re.compile(r"^facet\d* *=\s*(.*)\s*$") -re_comment = re.compile(r"#.*") +re_span_pointer = re.compile(r'span: (.*),start=(\d+),length=(\d+)') +re_xanalink = re.compile('xanalink: +([^ ]+) *$') +re_facet = re.compile('^facet\d* *=\s*(.*)\s*$') +re_comment = re.compile(r'#.*') project_dir = os.path.dirname(os.path.dirname(__file__)) xnb_per_char = 150000 - -class SpanContents(typing.TypedDict): - """Contents of a span.""" - - url: str - start: int - length: int - text: str - - -SpanTuple = tuple[str, int, int] - -SourceText = dict[str, ExternalText] - -ParsedLink = dict[str, str | None | list[list[str]]] - - -def parse_span(line: str) -> None | SpanTuple: - """Parse a span.""" +def parse_span(line): m = re_span_pointer.match(line) if not m: return None return (m.group(1), int(m.group(2)), int(m.group(3))) - -def get_span(text: SourceText, url: str, start: int, length: int) -> SpanContents: - """Get span from source text.""" +def get_span(text, url, start, length): return { - "url": url, - "start": start, - "length": length, - "text": text[url]["text"][start : start + length], + 'url': url, + 'start': start, + 'length': length, + 'text': text[url]['text'][start:start + length] } - -def get_urls(spans: list[SpanTuple]) -> set[str]: - """Get URLs from span tuples.""" +def get_urls(spans): return {i[0] for i in spans} - -def find_min_max(spans: list[SpanTuple], source): +def find_min_max(spans, source): text_min, text_max = {}, {} for url, start, length in spans: if url in text_min: @@ -68,85 +42,67 @@ def find_min_max(spans: list[SpanTuple], source): text_max[url] = start + length for s in source: - url = s["url"] - s["min"] = text_min[url] - s["max"] = text_max[url] + url = s['url'] + s['min'] = text_min[url] + s['max'] = text_max[url] +def span_html(span_type, num): + return ''.format(num=num, span_type=span_type) -def span_html(span_type: str, num: int) -> str: - """Open span tag.""" - return f'' - - -def xanapage_span_html( - num: int, - text: str, - url: str, - start: int, - length: int, - highlight: bool = True, - censor: bool = False, -) -> str: - """Generate HTML to represent a span.""" +def xanapage_span_html(num, text, url, start, length, highlight=True, censor=False): cls = [] if highlight: - cls = ["xanapagetransclusion", "transclusion"] - html_class = f""" class="{' '.join(cls)}""" if cls else "" + cls = ['xanapagetransclusion', 'transclusion'] + html_class = ' class="{}"'.format(' '.join(cls)) if cls else '' - html = ( - f'{text}' - ) + html = '{}'.format(num, html_class, escape(url), start, length, text) if censor: - return '' + html + "" + return '' + html + '' else: return html - -def parse_sourcedoc_facet(facet: list[str]) -> str: - """Parse sourcedoc facet.""" +def parse_sourcedoc_facet(facet): leg = facet[0] - prefix = "sourcedoc: " + prefix = 'sourcedoc: ' assert leg.startswith(prefix) - return leg[len(prefix) :] + return leg[len(prefix):] - -def parse_xanapage_facet(facet: list[str]) -> str: - """Parse xanapage facet.""" +def parse_xanapage_facet(facet): leg = facet[0] - prefix = "xanapage: " + prefix = 'xanapage: ' assert leg.startswith(prefix) - return leg[len(prefix) :] + return leg[len(prefix):] - -def parse_link(link_text: str) -> ParsedLink: +def parse_link(link_text): link_type = None - expect = "link_type" + expect = 'link_type' facets = [] for line in link_text.splitlines(): - line = re_comment.sub("", line).strip() + line = re_comment.sub('', line).strip() if not line: continue - if expect == "link_type": - if line.startswith("type="): + if expect == 'link_type': + if line.startswith('type='): link_type = line[5:] - expect = "facets" + expect = 'facets' continue - if expect != "facets": + if expect != 'facets': # print("unrecognized:", line) continue m = re_facet.match(line) if m: - legs: list[str] = [] + legs = [] facets.append(legs) if m.group(1): line = m.group(1) else: continue - if legs and legs[-1] == "span:" and line.startswith("http"): - legs[-1] += " " + line + if legs and legs[-1] == 'span:' and line.startswith('http'): + legs[-1] += ' ' + line else: legs.append(line.strip()) - return {"type": link_type, "facets": facets} + return {'type': link_type, 'facets': facets} + + diff --git a/sourcing/span.py b/sourcing/span.py index 9c3b164..5d0a6e4 100644 --- a/sourcing/span.py +++ b/sourcing/span.py @@ -1,35 +1,21 @@ -"""Span.""" - -import typing - import attr -import attr._make - -def greater_than_zero(instance: "Span", attribute: typing.Any, value: int) -> None: - """Value is greater than zero.""" +def greater_than_zero(instance, attribute, value): if value <= 0: - raise ValueError("must be greater than 0") + raise ValueError('must be greater than 0') - -def is_positive(instance: "Span", attribute: typing.Any, value: int) -> None: - """Value is positive.""" +def is_positive(instance, attribute, value): if value < 0: - raise ValueError("must be positive") - + raise ValueError('must be positive') @attr.s class Span: - """Span.""" - - url: str = attr.ib() + url: int = attr.ib() start: int = attr.ib(validator=is_positive) length: int = attr.ib(validator=greater_than_zero) def end(self) -> int: - """End position of span.""" return self.start + self.length def for_edl(self) -> str: - """Generate URL parameters for EDL.""" - return f"{self.url},start={self.start},length={self.length}" + return f'{self.url},start={self.start},length={self.length}' diff --git a/sourcing/static/bootstrap b/sourcing/static/bootstrap index 12ef71f..fe0f86b 120000 --- a/sourcing/static/bootstrap +++ b/sourcing/static/bootstrap @@ -1 +1 @@ -/usr/share/javascript/bootstrap4 \ No newline at end of file +/usr/share/javascript/bootstrap \ No newline at end of file diff --git a/sourcing/templates/base.html b/sourcing/templates/base.html index f277de6..c82a101 100644 --- a/sourcing/templates/base.html +++ b/sourcing/templates/base.html @@ -7,9 +7,7 @@ {% block title %}Xanadu{% endblock %} -{# #} - - + {% block style %} @@ -30,12 +28,8 @@ -{# -#} - - {% block scripts %} {% endblock %} diff --git a/sourcing/templates/home.html b/sourcing/templates/home.html index 27ef2f4..1485843 100644 --- a/sourcing/templates/home.html +++ b/sourcing/templates/home.html @@ -68,12 +68,8 @@ {% for doc in docs %} - - {{ doc.title(titles=titles) }} - {# - — {{ doc.user.username }} — {{ doc.created | datetime }} - #} - + {{ doc.title() }} + — {{ doc.user.username }} — {{ doc.created | datetime }} {%- for line in doc.snippet().splitlines() -%} @@ -83,7 +79,6 @@ {% endfor %} - {{ new_buttons() }} @@ -103,8 +98,7 @@ var show_type = document.getElementById('type_' + doc['type']).checked; var show_user = document.getElementById('user_' + doc['user']).checked; var show_year = document.getElementById('year_' + doc['year']).checked; - var show_link_type = (doc['type'] != 'xanalink' || - (doc['link_type'] && document.getElementById('link_type_' + doc['link_type']).checked)); + var show_link_type = doc['type'] != 'xanalink' || document.getElementById('link_type_' + doc['link_type']).checked; element.toggle(show_type && show_user && show_link_type && show_year); }); diff --git a/sourcing/templates/navbar-pale-fire.html b/sourcing/templates/navbar-pale-fire.html index a2addfe..715a618 100644 --- a/sourcing/templates/navbar-pale-fire.html +++ b/sourcing/templates/navbar-pale-fire.html @@ -3,7 +3,7 @@ - Xanaflight: Pale Fire, by Nabokov + Xanaflight: Three pages diff --git a/sourcing/templates/view.html b/sourcing/templates/view.html index efba47c..eaed120 100644 --- a/sourcing/templates/view.html +++ b/sourcing/templates/view.html @@ -94,7 +94,7 @@ {% if doc.type == 'xanapage' %} - Xanaviewer + In view {# diff --git a/sourcing/text.py b/sourcing/text.py index a3bf9b0..51fdaa3 100644 --- a/sourcing/text.py +++ b/sourcing/text.py @@ -1,18 +1,12 @@ -"""Text utility functions.""" - -import random import re -import typing +import random -re_newline = re.compile("\r?\n") +re_newline = re.compile('\r?\n') - -def find_newlines(text: str) -> typing.Iterator[int]: - """Find position of newlines in a piece of text.""" +def find_newlines(text): return (m.end(0) for m in re_newline.finditer(text)) - -def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]: +def iter_lines(text): start = 0 for m in re_newline.finditer(text): end = m.end(0) @@ -21,42 +15,33 @@ def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]: if start < len(text) - 1: yield (start, text[start:]) - -def censor_text(text: str) -> str: - """Censor text with a random character.""" - - def random_chr() -> str: +def censor_text(text): + def random_chr(): return chr(random.randint(9728, 9983)) + return ''.join(random_chr() if c.isalnum() else c for c in text) - return "".join(random_chr() if c.isalnum() else c for c in text) - - -def add_highlight(text: str, span_start: int, span_length: int): +def add_highlight(text, span_start, span_length): span_end = span_start + span_length for start, line in iter_lines(text): end = start + len(line) if not ((end > span_start) and (start < span_end)): - yield start, [{"text": line}] + yield start, [{'text': line}] continue within_line = span_start - start cur = [] if within_line > 0: before = line[:within_line] - cur.append({"text": before}) - cur.append({"highlight": line[max(within_line, 0) : within_line + span_length]}) - after = line[within_line + span_length :] + cur.append({'text': before}) + cur.append({'highlight': line[max(within_line, 0):within_line + span_length]}) + after = line[within_line + span_length:] if after: - cur.append({"text": after}) + cur.append({'text': after}) yield start, cur - -def first_non_empty_line(text: str) -> str | None: - """First non-empty line.""" +def first_non_empty_line(text): for start, cur in iter_lines(text): tidy = cur.strip() if tidy: return tidy - - return None diff --git a/sourcing/url.py b/sourcing/url.py index a76a65b..a4f733b 100644 --- a/sourcing/url.py +++ b/sourcing/url.py @@ -1,53 +1,42 @@ +import requests +from .model import Item import os.path import re -import typing - -import requests - -from .model import Item -from .types import ExternalText project_dir = os.path.dirname(os.path.dirname(__file__)) -cache_location = os.path.join(project_dir, "cache") +cache_location = os.path.join(project_dir, 'cache') -re_colon_slash = re.compile("[/:]+") +re_colon_slash = re.compile('[/:]+') +def url_filename(url): + return re_colon_slash.sub('_', url) -def url_filename(url: str) -> str: - """Generate filename from URL.""" - return re_colon_slash.sub("_", url) - - -def get_text(url: str) -> ExternalText: - """Get text from URL and return as dict.""" +def get_text(url): # assume UTF-8 text = get_url(url) - heading = url.rsplit("/", 1)[-1] + heading = url.rsplit('/', 1)[-1] return { - "url": url, - "text": text, - "heading": heading, - "length": len(text), + 'url': url, + 'text': text, + 'heading': heading, + 'length': len(text), } - -def get_url(url: str) -> str: - """Read a URL and return the content.""" +def get_url(url): item = Item.from_external(url) if item: - return typing.cast(str, item.text) - + return item.text content = requests.get(url).content - return content.decode(errors="replace") + return content.decode(errors='replace') filename = os.path.join(cache_location, url_filename(url)) if os.path.exists(filename): - content = open(filename, "rb").read() + content = open(filename, 'rb').read() else: content = requests.get(url).content - open(filename, "wb").write(content) + open(filename, 'wb').write(content) - return content.decode(errors="replace") + return content.decode(errors='replace') diff --git a/sourcing/utils.py b/sourcing/utils.py index e6d903a..1ad86fe 100644 --- a/sourcing/utils.py +++ b/sourcing/utils.py @@ -1,29 +1,26 @@ -from datetime import date, datetime, timedelta - import humanize +from datetime import date, timedelta +from jinja2 import Markup - -def display_datetime(dt: datetime) -> str: - """Render datetime as a string for display.""" +def display_datetime(dt): if dt is None: - return "n/a" + return 'n/a' today = date.today() if today - dt.date() < timedelta(days=1): return humanize.naturaltime(dt) else: - return dt.strftime("%a, %d %b %Y") + return dt.strftime('%a, %d %b %Y') - -def nbsp_at_start(line: str) -> str: - """Protect spaces at the start of a string.""" +def nbsp_at_start(line): + ''' Protect spaces at the start of a string. ''' space_count = 0 for c in line: - if c != " ": + if c != ' ': break space_count += 1 # return Markup(' ') * space_count + line[space_count:] - return "\u00A0" * space_count + line[space_count:] + return '\u00A0' * space_count + line[space_count:] +def protect_start_spaces(text): + return '\n'.join(nbsp_at_start(line) for line in text.splitlines()) -def protect_start_spaces(text: str) -> str: - return "\n".join(nbsp_at_start(line) for line in text.splitlines()) diff --git a/sourcing/view.py b/sourcing/view.py index 96f60f1..ab465de 100644 --- a/sourcing/view.py +++ b/sourcing/view.py @@ -1,63 +1,44 @@ -import json -import re -import typing -from functools import wraps - -import flask -from flask_login import ( - LoginManager, - current_user, - login_required, - login_user, - logout_user, -) -from itsdangerous import URLSafeTimedSerializer -from werkzeug.wrappers import Response - -from .database import session -from .edit import apply_edits -from .edl import fulfil_edl, fulfil_edl_with_links, fulfil_edl_with_sources, parse_edl -from .forms import ( - AccountSettingsForm, - ForgotPasswordForm, - ItemForm, - LoginForm, - PasswordForm, - SignupForm, - SourceDocForm, -) -from .mail import send_mail -from .model import Item, Reference, SourceDoc, User, XanaLink, XanaPage -from .parse import parse_link, parse_xanapage_facet -from .span import Span -from .text import add_highlight, iter_lines +from flask import (Blueprint, render_template, request, redirect, flash, + url_for, abort, jsonify, Response, current_app) +from flask_login import (login_user, current_user, logout_user, + login_required, LoginManager) +from .forms import (LoginForm, SignupForm, AccountSettingsForm, + UploadSourceDocForm, SourceDocForm, ItemForm, + ForgotPasswordForm, PasswordForm) +from .model import User, SourceDoc, Item, XanaPage, XanaLink, Reference +from .parse import parse_xanapage_facet, parse_link from .url import get_url +from .mail import send_mail +from .edl import fulfil_edl_with_sources, fulfil_edl, parse_edl, fulfil_edl_with_links +from .span import Span +from .edit import apply_edits +from .database import session +from .text import iter_lines, add_highlight +from werkzeug.debug.tbtools import get_current_traceback +from jinja2 import evalcontextfilter, Markup +from functools import wraps from .utils import nbsp_at_start - -# from werkzeug.debug.tbtools import get_current_traceback -# from jinja2 import evalcontextfilter, Markup +from itsdangerous import URLSafeTimedSerializer # from sqlalchemy_continuum import version_class +import json +import re login_manager = LoginManager() -login_manager.login_view = ".login" -re_paragraph = re.compile(r"(?:\r\n|\r|\n){2,}") -re_spanpointer = re.compile(r"([A-Za-z0-9]+),start=(\d+),length=(\d+)") -bp = flask.Blueprint("view", __name__) +login_manager.login_view = '.login' +re_paragraph = re.compile(r'(?:\r\n|\r|\n){2,}') +re_spanpointer = re.compile(r'([A-Za-z0-9]+),start=(\d+),length=(\d+)') +bp = Blueprint('view', __name__) - -def init_app(app: flask.Flask) -> None: - """Initialise app.""" +def init_app(app): login_manager.init_app(app) app.register_blueprint(bp) - -# @app.template_filter() -# @evalcontextfilter -# def newline_html(eval_ctx, value): -# return u'\n\n'.join(Markup(u'') + p.replace('\n', Markup('')) + Markup(u'') -# for p in re_paragraph.split(value)) - + @app.template_filter() + @evalcontextfilter + def newline_html(eval_ctx, value): + return u'\n\n'.join(Markup(u'') + p.replace('\n', Markup('')) + Markup(u'') + for p in re_paragraph.split(value)) @login_manager.user_loader def load_user(user_id): @@ -65,332 +46,270 @@ def load_user(user_id): # where do we redirect after signup is complete -view_after_signup = ".home" - +view_after_signup = '.home' @bp.context_processor def inject_user(): return dict(current_user=current_user) - def show_errors(f): @wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except Exception: - traceback = get_current_traceback( - skip=1, show_hidden_frames=False, ignore_system_exceptions=True - ) - return traceback.render_full().encode("utf-8", "replace") - + traceback = get_current_traceback(skip=1, show_hidden_frames=False, + ignore_system_exceptions=True) + return traceback.render_full().encode('utf-8', 'replace') return wrapper - -@bp.route("/") -def home() -> str: - """Home page.""" +@bp.route('/') +def home(): docs = Item.query.order_by(Item.created) docs_info = [] for item in docs: cur = { - "user": item.user.username, - "id": item.id, - "type": item.type, - "year": item.created.year, + 'user': item.user.username, + 'id': item.id, + 'type': item.type, + 'year': item.created.year, } - if item.type == "xanalink": - cur["link_type"] = item.link_type + if item.type == 'xanalink': + cur['link_type'] = item.link_type docs_info.append(cur) users = [item_user.username for item_user in User.query] - years = sorted({item["year"] for item in docs_info}) + years = sorted({item['year'] for item in docs_info}) - link_types = {item["link_type"] for item in docs_info if item.get("link_type")} + link_types = {item['link_type'] for item in docs_info if item.get('link_type')} - titles = XanaLink.get_all_titles() + return render_template('home.html', + docs=docs, + link_types=link_types, + years=years, + docs_info=docs_info, + nbsp_at_start=nbsp_at_start, + users=users) - return flask.render_template( - "home.html", - docs=docs, - link_types=link_types, - titles=titles, - years=years, - docs_info=docs_info, - nbsp_at_start=nbsp_at_start, - users=users, - ) - - -@bp.route("/password_reset", methods=["GET", "POST"]) -def password_reset() -> str | Response: - """Password reset.""" - site_name = "perma.pub" # FIXME: move to config +@bp.route('/password_reset', methods=['GET', 'POST']) +def password_reset(): + site_name = 'perma.pub' # FIXME: move to config form = ForgotPasswordForm() if not form.validate_on_submit(): - return flask.render_template("auth/password_reset.html", form=form) - ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) + return render_template('auth/password_reset.html', form=form) + ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"]) user = User.lookup_user_or_email(form.user_or_email.data) if user: - token = ts.dumps(user.id, salt="password-reset") - reset_link = flask.url_for(".reset_with_token", token=token, _external=True) - reset_mail = flask.render_template( - "mail/password_reset.txt", - reset_link=reset_link, - site_name=site_name, - user=user, - ) - subject = "Password reset on " + site_name + token = ts.dumps(user.id, salt='password-reset') + reset_link = url_for('.reset_with_token', token=token, _external=True) + reset_mail = render_template('mail/password_reset.txt', + reset_link=reset_link, + site_name=site_name, + user=user) + subject = 'Password reset on ' + site_name send_mail(user, subject, reset_mail) - return flask.redirect(flask.url_for(".password_reset_sent")) + return redirect(url_for('.password_reset_sent')) +@bp.route('/password_reset/sent', methods=['GET', 'POST']) +def password_reset_sent(): + return render_template('auth/password_reset_sent.html') -@bp.route("/password_reset/sent", methods=["GET", "POST"]) -def password_reset_sent() -> str: - """Password reset form.""" - return flask.render_template("auth/password_reset_sent.html") - - -@bp.route("/reset/", methods=["GET", "POST"]) -def reset_with_token(token: str) -> str | Response: - ts = URLSafeTimedSerializer(flask.current_app.config["SECRET_KEY"]) +@bp.route('/reset/', methods=['GET', 'POST']) +def reset_with_token(token): + ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"]) try: - user_id = ts.loads(token, salt="password-reset", max_age=86400) + user_id = ts.loads(token, salt='password-reset', max_age=86400) except Exception: - flask.abort(404) + abort(404) form = PasswordForm() if not form.validate_on_submit(): - return flask.render_template("auth/password_reset_confirm.html", form=form) + return render_template('auth/password_reset_confirm.html', form=form) user = User.query.get(user_id) user.set_password(form.password.data) session.add(user) session.commit() - return flask.redirect(flask.url_for(".password_reset_complete")) + return redirect(url_for('.password_reset_complete')) +@bp.route('/reset/done') +def password_reset_complete(): + return render_template('auth/password_reset_complete.html') -@bp.route("/reset/done") -def password_reset_complete() -> str: - return flask.render_template("auth/password_reset_complete.html") - - -@bp.route("/source_doc_upload", methods=["POST"]) +@bp.route('/source_doc_upload', methods=["POST"]) @show_errors -def source_doc_upload() -> Response: - """Source doc upload.""" - f = flask.request.files["sourcedoc_file"] +def source_doc_upload(): + f = request.files['sourcedoc_file'] text = f.read() doc = SourceDoc(text=text, user=current_user, filename=f.filename) session.add(doc) session.commit() - flask.flash("new source document uploaded") - return flask.redirect(doc.url) + flash('new source document uploaded') + return redirect(doc.url) +@bp.route('/about') +def about(): + return render_template('about.html') -@bp.route("/about") -def about() -> str: - """About page.""" - return flask.render_template("about.html") +@bp.route('/contact') +def contact(): + return render_template('contact.html') +def redirect_to_home(): + return redirect(url_for('.home')) -@bp.route("/contact") -def contact() -> str: - """Contact page.""" - return flask.render_template("contact.html") - - -def redirect_to_home() -> Response: - """Redirect to home page.""" - return flask.redirect(flask.url_for(".home")) - - -@bp.route("/login", methods=["GET", "POST"]) -def login() -> Response | str: - """Login page.""" - form = LoginForm(next=flask.request.args.get("next")) +@bp.route('/login', methods=['GET', 'POST']) +def login(): + form = LoginForm(next=request.args.get('next')) if form.validate_on_submit(): login_user(form.user, remember=form.remember.data) - flask.flash("Logged in successfully.") - return flask.redirect(flask.request.form.get("next") or flask.url_for(".home")) - return flask.render_template("login.html", form=form) + flash('Logged in successfully.') + return redirect(request.form.get('next') or url_for('.home')) + return render_template('login.html', form=form) - -@bp.route("/logout") -def logout() -> Response: - """Logout and redirect to home.""" +@bp.route('/logout') +def logout(): logout_user() - flask.flash("You have been logged out.") + flash('You have been logged out.') return redirect_to_home() - -@bp.route("/signup", methods=["GET", "POST"]) -def signup() -> str | Response: - if not flask.current_app.config.get("ALLOW_SIGNUP"): - flask.abort(404) +@bp.route('/signup', methods=['GET', 'POST']) +def signup(): + if not current_app.config.get('ALLOW_SIGNUP'): + abort(404) form = SignupForm() if not form.validate_on_submit(): - return flask.render_template("signup.html", form=form) + return render_template('signup.html', form=form) - data = form.data.copy() - del data["csrf_token"] - user = User(**data) + user = User(**form.data) session.add(user) session.commit() - flask.flash("New account created.") + flash('New account created.') login_user(user) - return flask.redirect(flask.url_for(view_after_signup)) + return redirect(url_for(view_after_signup)) +def redirect_to_doc(doc): + return redirect(url_for('.view_document', hashid=doc.hashid)) -def redirect_to_doc(doc: Item) -> Response: - """Redirect to the given item.""" - return flask.redirect(flask.url_for(".view_document", hashid=doc.hashid)) - - -def get_source_doc(username: str, hashid: str) -> SourceDoc: - """Get a source doc that belongs to the given uesr.""" +def get_source_doc(username, hashid): doc = Item.get_by_hashid(hashid) - if not doc or doc.user.username != username: - flask.abort(404) - return typing.cast(SourceDoc, doc) + if doc and doc.user.username != username: + doc = None + return doc if doc else abort(404) - -def get_xanapage(username: str, hashid: str) -> XanaPage: - """Get a xanapage that belongs to the given uesr.""" +def get_xanapage(username, hashid): doc = Item.get_by_hashid(hashid) - if not doc or doc.type != "xanapage" or doc.user.username != username: - flask.abort(404) - return typing.cast(XanaPage, doc) + if doc and doc.user.username != username: + doc = None + return doc if doc and doc.type == 'xanapage' else abort(404) - -def get_item(username: str, hashid: str) -> Item: - """Get an item with the given hashid, check ownership.""" +def get_item(username, hashid): doc = Item.get_by_hashid(hashid) - if not doc or doc.user.username != username: - flask.abort(404) - return doc + if doc and doc.user.username != username: + doc = None + return doc if doc else abort(404) - -@bp.route("///edl") -def view_edl(username: str, hashid: str) -> str: - """Show EDL for xanapage.""" +@bp.route('///edl') +def view_edl(username, hashid): item = get_xanapage(username, hashid) - return flask.render_template( - "view.html", doc=item, iter_lines=iter_lines, nbsp_at_start=nbsp_at_start - ) + return render_template('view.html', + doc=item, + iter_lines=iter_lines, + nbsp_at_start=nbsp_at_start) - -@bp.route("///realize") -def realize_edl(username: str, hashid: str) -> str: +@bp.route('///realize') +def realize_edl(username, hashid): item = get_xanapage(username, hashid) - assert item.text is not None spans = list(fulfil_edl(item.text)) - doc_text = "".join(span["text"] for span in spans) + doc_text = ''.join(span['text'] for span in spans) - return flask.render_template( - "realize.html", - doc=doc_text, - iter_lines=iter_lines, - item=item, - nbsp_at_start=nbsp_at_start, - ) + return render_template('realize.html', + doc=doc_text, + iter_lines=iter_lines, + item=item, + nbsp_at_start=nbsp_at_start) - -@bp.route("///raw") -def view_item_raw(username: str, hashid: str) -> str | Response: +@bp.route('///raw') +def view_item_raw(username, hashid): return view_item(username, hashid, raw=True) - -def fulfil_xanaflight(item: XanaLink) -> str: +def fulfil_xanaflight(item): link = item.parse() - assert link["type"] == "flight" - facets = link["facets"] + assert link['type'] == 'flight' + facets = link['facets'] docs = [] edl_list = [] all_links = [] for facet in facets: xanapage = Item.from_external(parse_xanapage_facet(facet)) - assert xanapage and xanapage.type == "xanapage" and xanapage.text + assert xanapage.type == 'xanapage' edl = parse_edl(xanapage.text) edl_list.append((xanapage, edl)) - all_links += [parse_link(link["text"]) for link in edl["links"]] + all_links += [parse_link(link['text']) for link in edl['links']] for doc_num, (xanapage, edl) in enumerate(edl_list): - doc = fulfil_edl_with_links( - edl, doc_num=doc_num, links=all_links, hide_all_transclusions=True - ) - doc["hashid"] = xanapage.hashid - del doc["link_count"] + doc = fulfil_edl_with_links(edl, + doc_num=doc_num, + links=all_links, + hide_all_transclusions=True) + doc['hashid'] = xanapage.hashid + del doc['link_count'] docs.append(doc) - return flask.render_template( - "view/xanaflight.html", item=item, link_count=len(all_links), docs=docs - ) + return render_template('view/xanaflight.html', + item=item, + link_count=len(all_links), + docs=docs) - -@bp.route("///fulfil") -def fulfil(username: str, hashid: str) -> str: - """Fufil XanaPage or XanaLink.""" +@bp.route('///fulfil') +def fulfil(username, hashid): item = get_item(username, hashid) - if item.type == "xanapage": - assert item.text is not None - - return flask.render_template( - "view/xanapage.html", item=item, doc=fulfil_edl_with_sources(item.text) - ) - if ( - item.type == "xanalink" - and item.text is not None - and item.text.startswith("type=flight") - ): - assert isinstance(item, XanaLink) + if item.type == 'xanapage': + return render_template('view/xanapage.html', + item=item, + doc=fulfil_edl_with_sources(item.text)) + if item.type == 'xanalink' and item.text.startswith('type=flight'): return fulfil_xanaflight(item) - assert False - - -@bp.route("///set_title", methods=["POST"]) -def set_title(username: str, hashid: str) -> Response: +@bp.route('///set_title', methods=['POST']) +def set_title(username, hashid): item = get_item(username, hashid) - has_title = item.has_title() - item.set_title(flask.request.form["title"], current_user) - flask.flash("title change saved" if has_title else "title added") - return flask.redirect(item.url) + has_title = item.has_title + item.set_title(request.form['title'], current_user) + flash('title change saved' if has_title else 'title added') + return redirect(item.url) - -@bp.route("///delete", methods=["POST"]) -def delete_item(username: str, hashid: str) -> Response: +@bp.route('///delete', methods=['POST']) +def delete_item(username, hashid): item = get_item(username, hashid) session.delete(item) session.commit() - flask.flash("item deleted") + flash('item deleted') return redirect_to_home() - -def save_new_xanalink(doc1: XanaPage, doc2: XanaPage) -> None: - """Save XanaLink.""" - start1 = flask.request.form["left_start"] - length1 = flask.request.form["left_length"] - start2 = flask.request.form["right_start"] - length2 = flask.request.form["right_length"] +def save_new_xanalink(doc1, doc2): + start1 = request.form['left_start'] + length1 = request.form['left_length'] + start2 = request.form['right_start'] + length2 = request.form['right_length'] assert length1 assert length2 - span1 = f"{doc1.external_url},start={start1},length={length1}" - span2 = f"{doc2.external_url},start={start2},length={length2}" + span1 = f'{doc1.external_url},start={start1},length={length1}' + span2 = f'{doc2.external_url},start={start2},length={length2}' lines = [ - "type=", - "facet=", - "span: " + span1, - "facet=", - "span: " + span2, + 'type=', + 'facet=', + 'span: ' + span1, + 'facet=', + 'span: ' + span2, ] - text = "".join(line + "\n" for line in lines) + text = ''.join(line + '\n' for line in lines) obj = XanaLink(user=current_user, text=text) session.add(obj) @@ -401,155 +320,126 @@ def save_new_xanalink(doc1: XanaPage, doc2: XanaPage) -> None: session.add(ref2) session.commit() - -@bp.route("/build_links", methods=["GET", "POST"]) -def build_links() -> str | Response: - """Build links.""" +@bp.route('/build_links', methods=['GET', 'POST']) +def build_links(): doc1, doc2 = None, None hashid1, hashid2 = None, None - if "doc1" in flask.request.args: - hashid1 = flask.request.args["doc1"] + if 'doc1' in request.args: + hashid1 = request.args['doc1'] doc1 = Item.get_by_hashid(hashid1) - if "doc2" in flask.request.args: - hashid2 = flask.request.args["doc2"] + if 'doc2' in request.args: + hashid2 = request.args['doc2'] doc2 = Item.get_by_hashid(hashid2) - if flask.request.method == "POST": - assert ( - doc1 and doc2 and isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage) - ) + if request.method == 'POST': save_new_xanalink(doc1, doc2) - assert flask.request.endpoint - return flask.redirect( - flask.url_for(flask.request.endpoint, doc1=hashid1, doc2=hashid2) - ) + return redirect(url_for(request.endpoint, doc1=hashid1, doc2=hashid2)) if doc1 and doc2: - assert isinstance(doc1, XanaPage) and isinstance(doc2, XanaPage) - links = list({i for i in doc1.subjects} & {i for i in doc2.subjects}) + links = list({i for i in doc1.subjects} & + {i for i in doc2.subjects}) else: links = [] - return flask.render_template( - "build_links.html", - iter_lines=iter_lines, - nbsp_at_start=nbsp_at_start, - SourceDoc=SourceDoc, - hashid1=hashid1, - hashid2=hashid2, - doc1=doc1, - doc2=doc2, - links=links, - ) + return render_template('build_links.html', + iter_lines=iter_lines, + nbsp_at_start=nbsp_at_start, + SourceDoc=SourceDoc, + hashid1=hashid1, + hashid2=hashid2, + doc1=doc1, + doc2=doc2, + links=links) - -@bp.route("//") -def view_item(username: str, hashid: str, raw: bool = False) -> str | Response: - """View item.""" - if "," in hashid: +@bp.route('//') +def view_item(username, hashid, raw=False): + if ',' in hashid: m = re_spanpointer.match(hashid) - assert m hashid, start, length = m.group(1), int(m.group(2)), int(m.group(3)) item = get_item(username, hashid) if raw: - assert item.text is not None - return flask.Response( - item.text[start : length + start], mimetype="text/plain" - ) + return Response(item.text[start:length + start], mimetype='text/plain') else: start, length = None, None item = get_item(username, hashid) if raw: - return flask.Response(item.text, mimetype="text/plain") + return Response(item.text, mimetype='text/plain') - v = flask.request.args.get("v") + v = request.args.get('v') if v: if not v.isdigit(): - flask.abort(404) + abort(404) try: version = item.versions[int(v) - 1] except IndexError: - flask.abort(404) + abort(404) text = version.text else: version = None text = item.text - if item.type == "xanapage": - assert item.text + if item.type == 'xanapage': spans = list(fulfil_edl(item.text)) - doc_text = "".join(span["text"] for span in spans) + doc_text = ''.join(span['text'] for span in spans) else: doc_text = None - return flask.render_template( - "view.html", - doc=item, - doc_text=doc_text, - version=version, - text=text, - span_start=start, - span_length=length, - add_highlight=add_highlight, - nbsp_at_start=nbsp_at_start, - iter_lines=iter_lines, - ) + return render_template('view.html', + doc=item, + doc_text=doc_text, + version=version, + text=text, + span_start=start, + span_length=length, + add_highlight=add_highlight, + nbsp_at_start=nbsp_at_start, + iter_lines=iter_lines) - -@bp.route("///history") -def history(username: str, hashid: str) -> str: +@bp.route('///history') +def history(username, hashid): item = get_item(username, hashid) - return flask.render_template("history.html", doc=item) + return render_template('history.html', doc=item) - -@bp.route("///as_xanapage", methods=["POST"]) -def create_xanapage_from_sourcedoc(username: str, hashid: str) -> Response: +@bp.route('///as_xanapage', methods=['POST']) +def create_xanapage_from_sourcedoc(username, hashid): src_doc = get_source_doc(username, hashid) - edl = "span: " + src_doc.entire_span + "\n" + edl = 'span: ' + src_doc.entire_span + '\n' page = XanaPage(user=current_user, text=edl) session.add(page) session.commit() - page.update_references() - flask.flash("New xanapage created.") - return flask.redirect(page.url) + flash('New xanapage created.') + return redirect(page.url) - -@bp.route("///xanaedit", methods=["GET", "POST"]) -def xanaedit_item(username: str, hashid: str) -> str | Response: - if flask.request.method == "POST": +@bp.route('///xanaedit', methods=['GET', 'POST']) +def xanaedit_item(username, hashid): + if request.method == 'POST': save_xanaedit(username, hashid) - return flask.redirect( - flask.url_for("xanaedit_item", username=username, hashid=hashid) - ) + return redirect(url_for('xanaedit_item', username=username, hashid=hashid)) doc = get_xanapage(username, hashid) - assert doc.text is not None spans = list(fulfil_edl(doc.text)) - doc_text = "".join(span["text"] for span in spans) + doc_text = ''.join(span['text'] for span in spans) - return flask.render_template("xanaedit.html", doc=doc, doc_text=doc_text) + return render_template('xanaedit.html', doc=doc, doc_text=doc_text) - -def save_xanaedit(username: str, hashid: str) -> XanaPage: - """Save a XanaEdit.""" +def save_xanaedit(username, hashid): page = get_xanapage(username, hashid) - assert page.text current_edl = parse_edl(page.text) - spans = [Span(*span) for span in current_edl["spans"]] - edits = json.loads(flask.request.form["edits"]) - new_text = "" + spans = [Span(*span) for span in current_edl['spans']] + edits = json.loads(request.form['edits']) + new_text = '' new_text_pos = 0 for edit in edits: - if edit["op"] not in ("insert", "replace"): + if edit['op'] not in ('insert', 'replace'): continue - new_text += edit["new"] - edit["span"] = Span("placeholder", new_text_pos, len(edit["new"])) - new_text_pos += len(edit["new"]) + new_text += edit['new'] + edit['span'] = Span('placeholder', new_text_pos, len(edit['new'])) + new_text_pos += len(edit['new']) spans = apply_edits(spans, edits) new_src_doc = SourceDoc(user=current_user, text=new_text) @@ -557,162 +447,141 @@ def save_xanaedit(username: str, hashid: str) -> XanaPage: session.commit() for span in spans: - if span.url == "placeholder": + if span.url == 'placeholder': span.url = new_src_doc.external_url - new_edl = "".join(f"span: {span.for_edl()}\n" for span in spans) + new_edl = ''.join(f'span: {span.for_edl()}\n' for span in spans) page.text = new_edl session.commit() - page.update_references() - flask.flash("Edits saved.") + flash('Edits saved.') return page +@bp.route('///finish', methods=['POST']) +def finish_xanaedit(username, hashid): -@bp.route("///finish", methods=["POST"]) -def finish_xanaedit(username: str, hashid: str) -> Response: page = save_xanaedit(username, hashid) - return flask.redirect(page.url) + return redirect(page.url) - -@bp.route("///edit", methods=["GET", "POST"]) -def edit_item(username: str, hashid: str) -> str | Response: +@bp.route('///edit', methods=['GET', 'POST']) +def edit_item(username, hashid): obj = get_item(username, hashid) form = SourceDocForm(obj=obj) if form.validate_on_submit(): form.populate_obj(obj) session.commit() - if obj.type == "xanapage": - obj.update_references() - flask.flash("Changes to {} saved.".format(obj.type)) - return flask.redirect(obj.url) - return flask.render_template("edit.html", form=form, doc=obj) + flash('Changes to {} saved.'.format(obj.type)) + return redirect(obj.url) + return render_template('edit.html', form=form, doc=obj) - -@bp.route("/source_doc_text/") -def source_doc_text(source_doc_id: int) -> str: +@bp.route('/source_doc_text/') +def source_doc_text(source_doc_id): doc = SourceDoc.query.get(source_doc_id) - return flask.render_template("source_doc_text.html", doc=doc, iter_lines=iter_lines) + return render_template('source_doc_text.html', doc=doc, iter_lines=iter_lines) - -@bp.route("/settings/account", methods=["GET", "POST"]) +@bp.route('/settings/account', methods=['GET', 'POST']) @login_required -def account_settings() -> str | Response: - """Account settings.""" +def account_settings(): form = AccountSettingsForm(obj=current_user) if form.validate_on_submit(): form.populate_obj(current_user) session.commit() - flask.flash("Account details updated.") - assert flask.request.endpoint - return flask.redirect(flask.url_for(flask.request.endpoint)) - return flask.render_template("user/account.html", form=form) + flash('Account details updated.') + return redirect(url_for(request.endpoint)) + return render_template('user/account.html', form=form) - -@bp.route("/new/sourcedoc", methods=["GET", "POST"]) +@bp.route('/new/sourcedoc', methods=['GET', 'POST']) @login_required -def new_sourcedoc() -> str | Response: - """Add a new sourcedoc.""" +def new_sourcedoc(): form = SourceDocForm() if form.validate_on_submit(): doc = SourceDoc(user=current_user) form.populate_obj(doc) session.add(doc) session.commit() - flask.flash("New document saved.") - return flask.redirect(doc.url) - return flask.render_template("new.html", form=form, item_type="source document") + flash('New document saved.') + return redirect(doc.url) + return render_template('new.html', form=form, item_type='source document') - -@bp.route("/new/xanalink/raw", methods=["GET", "POST"]) +@bp.route('/new/xanalink/raw', methods=['GET', 'POST']) @login_required -def new_xanalink_raw() -> str | Response: +def new_xanalink_raw(): form = ItemForm() if form.validate_on_submit(): obj = XanaLink(user=current_user) form.populate_obj(obj) session.add(obj) session.commit() - flask.flash("New xanalink saved.") - return flask.redirect(obj.url) - return flask.render_template("new.html", form=form, item_type="xanalink") + flash('New xanalink saved.') + return redirect(obj.url) + return render_template('new.html', form=form, item_type='xanalink') - -@bp.route("/new/xanalink", methods=["GET", "POST"]) +@bp.route('/new/xanalink', methods=['GET', 'POST']) @login_required -def new_xanalink() -> str | Response: - """Add a new xanalink.""" - if flask.request.method != "POST": - return flask.render_template("new_xanalink.html") +def new_xanalink(): + if request.method != 'POST': + return render_template('new_xanalink.html') - data = flask.request.get_json() - assert isinstance(data, dict) - lines = ["type=" + typing.cast(str, data["link_type"])] - for facet in typing.cast(list[list[str]], data["facets"]): - lines += ["facet="] + facet - text = "".join(line + "\n" for line in lines) + data = request.get_json() + lines = ['type=' + data['link_type']] + for facet in data['facets']: + lines += ['facet='] + facet + text = ''.join(line + '\n' for line in lines) obj = XanaLink(user=current_user, text=text) session.add(obj) session.commit() - flask.flash("New xanalink saved.") - return flask.jsonify(url=obj.url) + flash('New xanalink saved.') + return jsonify(url=obj.url) - -@bp.route("/new/xanapage", methods=["GET", "POST"]) +@bp.route('/new/xanapage', methods=['GET', 'POST']) @login_required -def new_xanapage() -> str | Response: - """Start a new xanapage.""" +def new_xanapage(): form = ItemForm() if form.validate_on_submit(): obj = XanaPage(user=current_user) form.populate_obj(obj) session.add(obj) session.commit() - flask.flash("New xanapage saved.") - return flask.redirect(obj.url) - return flask.render_template("new.html", form=form, item_type="xanapage") + flash('New xanapage saved.') + return redirect(obj.url) + return render_template('new.html', form=form, item_type='xanapage') - -@bp.route("/edit/", methods=["GET", "POST"]) +@bp.route('/edit/', methods=['GET', 'POST']) @login_required -def edit_source_document(filename: str) -> str | Response: - """Edit a source document.""" +def edit_source_document(filename): doc = get_source_doc(current_user.username, filename) form = SourceDocForm(obj=doc) if form.validate_on_submit(): form.populate_obj(doc) session.add(doc) session.commit() - flask.flash("Changes to document saved.") - return flask.redirect(doc.url) - return flask.render_template("edit.html", form=form, doc=doc) + flash('Changes to document saved.') + return redirect(doc.url) + return render_template('edit.html', form=form, doc=doc) - -@bp.route("/api/1/get//") -def api_get_document(username: str, filename: str) -> Response: - """API to get a document, returns JSON.""" +@bp.route('/api/1/get//') +def api_get_document(username, filename): doc = get_source_doc(username, filename) - assert doc.text is not None + if not doc: + return abort(404) ret = { - "username": username, - "filename": filename, - "character_count": len(doc.text), - "document_price": str(doc.document_price), - "price_per_character": str(doc.price_per_character), + 'username': username, + 'filename': filename, + 'character_count': len(doc.text), + 'document_price': str(doc.document_price), + 'price_per_character': str(doc.price_per_character), } - return flask.jsonify(ret) + return jsonify(ret) - -@bp.route("/get_span.json") -def get_span() -> Response: - """Return JSON representing a span.""" - url = flask.request.args["url"] - start = int(flask.request.args["start"]) - length = int(flask.request.args["length"]) - spanid = flask.request.args["spanid"] +@bp.route('/get_span.json') +def get_span(): + url = request.args['url'] + start = int(request.args['start']) + length = int(request.args['length']) + spanid = request.args['spanid'] text = get_url(url) - return flask.jsonify(text=text[start : start + length], spanid=spanid) + return jsonify(text=text[start:start + length], spanid=spanid)
{%- for line in doc.snippet().splitlines() -%} @@ -83,7 +79,6 @@
') + p.replace('\n', Markup('')) + Markup(u'