Compare commits

..

No commits in common. "fcc3afa4dca3872d2068b028b662bc7b982745be" and "994fccf764d90da68745cf8f24bc8350808e951e" have entirely different histories.

20 changed files with 739 additions and 1308 deletions

4
run.py
View file

@ -3,5 +3,5 @@
from sourcing import create_app from sourcing import create_app
if __name__ == "__main__": if __name__ == "__main__":
app = create_app("config.default") app = create_app('config.default')
app.run("0.0.0.0", debug=True) app.run('0.0.0.0', debug=True)

View file

@ -1,14 +1,12 @@
from flask import Flask from flask import Flask
from . import database
from . import database, view from . import view
from .utils import display_datetime from .utils import display_datetime
def create_app(config):
def create_app(config: str) -> Flask:
"""Create the application."""
app = Flask(__name__) app = Flask(__name__)
app.config.from_object(config) app.config.from_object(config)
database.init_app(app) database.init_app(app)
view.init_app(app) view.init_app(app)
app.jinja_env.filters["datetime"] = display_datetime app.jinja_env.filters['datetime'] = display_datetime
return app return app

View file

@ -1,50 +1,43 @@
from . import create_app, model, database, edl
from pprint import pprint from pprint import pprint
import click import click
from . import create_app, database, edl, model app = create_app('config.default')
app = create_app("config.default")
@app.cli.command() @app.cli.command()
@click.argument("user_or_email") @click.argument('user_or_email')
@click.option("--password", prompt=True, hide_input=True) @click.option('--password', prompt=True, hide_input=True)
def reset_password(user_or_email, password): def reset_password(user_or_email, password):
user = model.User.lookup_user_or_email(user_or_email) user = model.User.lookup_user_or_email(user_or_email)
user.set_password(password) user.set_password(password)
database.session.commit() database.session.commit()
print(f"password updated for {user.username} ({user.email})") print(f'password updated for {user.username} ({user.email})')
@app.cli.command() @app.cli.command()
@click.argument("hashid") @click.argument('hashid')
def parse_link(hashid): def parse_link(hashid):
home = "http://localhost:5000/" home = 'http://localhost:5000/'
item = model.Item.get_by_hashid(hashid) item = model.Item.get_by_hashid(hashid)
pprint(item.parse()) pprint(item.parse())
print(item.item_and_title(home)) print(item.item_and_title(home))
@app.cli.command() @app.cli.command()
def all_titles(): def all_titles():
home = "http://localhost:5000/" home = 'http://localhost:5000/'
titles = model.XanaLink.get_all_titles(home=home) titles = model.XanaLink.get_all_titles(home=home)
print(titles.values()) print(titles.values())
@app.cli.command() @app.cli.command()
@click.argument("hashid") @click.argument('hashid')
def delete_item(hashid): def delete_item(hashid):
item = model.Item.get_by_hashid(hashid) item = model.Item.get_by_hashid(hashid)
database.session.delete(item) database.session.delete(item)
database.session.commit() database.session.commit()
@app.cli.command() @app.cli.command()
def populate_references(): def populate_references():
home = "http://localhost:5000/" home = 'http://localhost:5000/'
seen = set() seen = set()
for ref in model.Reference.query: for ref in model.Reference.query:
@ -52,14 +45,14 @@ def populate_references():
for link_obj in model.XanaLink.query: for link_obj in model.XanaLink.query:
link = link_obj.parse() link = link_obj.parse()
for items in link["facets"]: for items in link['facets']:
for i in items: for i in items:
k, _, v = i.partition(": ") k, _, v = i.partition(': ')
if k == "span" and "," in v: if k == 'span' and ',' in v:
v = v.partition(",")[0] v = v.partition(',')[0]
item = model.Item.from_external(v, home=home) item = model.Item.from_external(v, home=home)
if item: if item:
print(link_obj.id, "->", item.id) print(link_obj.id, '->', item.id)
as_tuple = (link_obj.id, item.id) as_tuple = (link_obj.id, item.id)
if as_tuple in seen: if as_tuple in seen:
continue continue
@ -69,13 +62,13 @@ def populate_references():
for xanapage in model.XanaPage.query: for xanapage in model.XanaPage.query:
doc_edl = edl.parse_edl(xanapage.text) doc_edl = edl.parse_edl(xanapage.text)
if "spans" not in doc_edl or not doc_edl["spans"]: if 'spans' not in doc_edl or not doc_edl['spans']:
continue continue
for url, start, length in doc_edl["spans"]: for url, start, length in doc_edl['spans']:
src_doc = model.Item.from_external(url, home=home) src_doc = model.Item.from_external(url, home=home)
if not src_doc.id: if not src_doc.id:
continue continue
print(xanapage.id, "->", src_doc.id) print(xanapage.id, '->', src_doc.id)
as_tuple = (xanapage.id, src_doc.id) as_tuple = (xanapage.id, src_doc.id)
if as_tuple in seen: if as_tuple in seen:
continue continue
@ -85,11 +78,10 @@ def populate_references():
database.session.commit() database.session.commit()
@app.cli.command() @app.cli.command()
@click.argument("hashid") @click.argument('hashid')
def show_references(hashid): def show_references(hashid):
item = model.Item.get_by_hashid(hashid) item = model.Item.get_by_hashid(hashid)
print("item_id:", item.id) print('item_id:', item.id)
print("subjects:", [i.id for i in item.subjects]) print('subjects:', [i.id for i in item.subjects])
print("objects:", [i.id for i in item.objects]) print('objects:', [i.id for i in item.objects])

View file

@ -1,28 +1,18 @@
"""Database."""
import flask
import sqlalchemy
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker()) session = scoped_session(sessionmaker())
def init_db(db_url):
def init_db(db_url: str) -> None:
"""Initialise databsae."""
session.configure(bind=get_engine(db_url)) session.configure(bind=get_engine(db_url))
def get_engine(db_url):
return create_engine(db_url, pool_recycle=3600)
def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: def init_app(app):
"""Create an engine object.""" db_url = app.config['DB_URL']
return create_engine(db_url, echo=False, pool_recycle=3600)
def init_app(app: flask.app.Flask) -> None:
"""Initialise database connection within flask app."""
db_url = app.config["DB_URL"]
session.configure(bind=get_engine(db_url)) session.configure(bind=get_engine(db_url))
@app.teardown_appcontext @app.teardown_appcontext
def shutdown_session(exception: Exception | None = None) -> None: def shutdown_session(exception=None):
session.remove() session.remove()

View file

@ -1,43 +1,25 @@
"""Edit."""
import typing
import attr import attr
from .span import Span
class EditOutOfRange(Exception): class EditOutOfRange(Exception):
"""Edit out of range.""" pass
def apply_delete(current_spans, edit):
class Edit(typing.TypedDict):
"""Edit."""
start: int
old: str
span: Span
op: str
def apply_delete(current_spans: list[Span], edit: Edit) -> list[Span]:
"""Apply delete."""
assert edit assert edit
if not current_spans: if not current_spans:
raise ValueError("edit is out of bounds") raise ValueError('edit is out of bounds')
spans = [] spans = []
pos = 0 pos = 0
edit_end = edit["start"] + len(edit["old"]) edit_end = edit['start'] + len(edit['old'])
cur_span = current_spans.pop(0) cur_span = current_spans.pop(0)
while pos + cur_span.length < edit["start"]: while pos + cur_span.length < edit['start']:
spans.append(cur_span) spans.append(cur_span)
pos += cur_span.length pos += cur_span.length
cur_span = current_spans.pop(0) cur_span = current_spans.pop(0)
if edit["start"] > pos: if edit['start'] > pos:
new_span = attr.evolve(cur_span, length=edit["start"] - pos) new_span = attr.evolve(cur_span, length=edit['start'] - pos)
spans.append(new_span) spans.append(new_span)
while pos + cur_span.length < edit_end: while pos + cur_span.length < edit_end:
@ -48,27 +30,28 @@ def apply_delete(current_spans: list[Span], edit: Edit) -> list[Span]:
offset = cur_span.start - pos offset = cur_span.start - pos
new_start = offset + (edit_end - pos) new_start = offset + (edit_end - pos)
diff = new_start - cur_span.start diff = new_start - cur_span.start
new_span = attr.evolve(cur_span, length=cur_span.length - diff, start=new_start) new_span = attr.evolve(cur_span,
length=cur_span.length - diff,
start=new_start)
spans.append(new_span) spans.append(new_span)
spans += current_spans spans += current_spans
return spans return spans
def apply_insert(current_spans, edit):
def apply_insert(current_spans: list[Span], edit: Edit) -> Span | list[Span]: if not current_spans and edit['0'] == 0:
if not current_spans and edit["0"] == 0: return edit['span']
return edit["span"]
pos = 0 pos = 0
spans = [] spans = []
cur_span = current_spans.pop(0) cur_span = current_spans.pop(0)
while pos + cur_span.length < edit["start"]: while pos + cur_span.length < edit['start']:
spans.append(cur_span) spans.append(cur_span)
pos += cur_span.length pos += cur_span.length
cur_span = current_spans.pop(0) cur_span = current_spans.pop(0)
if edit["start"] >= pos: if edit['start'] >= pos:
length_a = edit["start"] - pos length_a = edit['start'] - pos
length_b = cur_span.length - length_a length_b = cur_span.length - length_a
if length_a: if length_a:
@ -76,34 +59,35 @@ def apply_insert(current_spans: list[Span], edit: Edit) -> Span | list[Span]:
pos += length_a pos += length_a
spans.append(span_a) spans.append(span_a)
spans.append(edit["span"]) spans.append(edit['span'])
pos += edit["span"].length pos += edit['span'].length
if length_b: if length_b:
span_b = attr.evolve( span_b = attr.evolve(cur_span,
cur_span, start=cur_span.start + length_a, length=length_b start=cur_span.start + length_a,
) length=length_b)
spans.append(span_b) spans.append(span_b)
pos += length_b pos += length_b
else: else:
spans.append(edit["span"]) spans.append(edit['span'])
spans += current_spans spans += current_spans
return spans return spans
def apply_edits(spans, edits):
def apply_edits(spans: list[Span], edits: list[Edit]) -> list[Span]:
for edit in edits: for edit in edits:
if edit["op"] == "delete": if edit['op'] == 'delete':
spans = apply_delete(spans, edit) spans = apply_delete(spans, edit)
continue continue
if edit["op"] == "insert": if edit['op'] == 'insert':
spans = apply_insert(spans, edit) spans = apply_insert(spans, edit)
continue continue
if edit["op"] == "replace": if edit['op'] == 'replace':
spans = apply_delete(spans, edit) spans = apply_delete(spans, edit)
spans = apply_insert(spans, edit) spans = apply_insert(spans, edit)
continue continue
return spans return spans

View file

@ -1,126 +1,87 @@
"""Edit decision list.""" from .url import get_url, get_text
from .parse import get_span, parse_span, parse_link, parse_sourcedoc_facet, xanapage_span_html, span_html, get_urls
import re
import typing
from collections import defaultdict from collections import defaultdict
from html import escape from html import escape
from pprint import pprint
from .parse import (
SourceText,
SpanContents,
get_span,
get_urls,
parse_link,
parse_sourcedoc_facet,
parse_span,
span_html,
xanapage_span_html,
)
from .url import get_text, get_url
from .utils import protect_start_spaces from .utils import protect_start_spaces
re_comment = re.compile(r"#.*") import re
re_xanalink = re.compile("xanalink: +([^ ]+) *$")
re_comment = re.compile(r'#.*')
re_xanalink = re.compile('xanalink: +([^ ]+) *$')
max_sourcedoc_size = 600000 max_sourcedoc_size = 600000
def fulfil_edl(edl):
def fulfil_edl(edl: str) -> typing.Iterator[SpanContents]: text = {}
"""Yeild each span of an EDL.""" for url, start, length in parse_edl(edl)['spans']:
text: SourceText = {}
for url, start, length in parse_edl(edl)["spans"]:
if url not in text: if url not in text:
text[url] = get_text(url) text[url] = get_text(url)
yield get_span(text, url, start, length) yield get_span(text, url, start, length)
def parse_edl(edl_text):
class EDLDict(typing.TypedDict): edl = {
"""Dict representing an EDL.""" 'spans': [],
'links': [],
spans: list[tuple[str, int, int]]
links: list[dict[str, typing.Any]]
def parse_edl(edl_text: str) -> EDLDict:
"""Parse an EDL and return contents."""
edl: EDLDict = {
"spans": [],
"links": [],
} }
for line in edl_text.splitlines(): for line in edl_text.splitlines():
line = re_comment.sub("", line).strip() line = re_comment.sub('', line).strip()
if not line: if not line:
continue continue
span_pointer = parse_span(line) span_pointer = parse_span(line)
if span_pointer: if span_pointer:
edl["spans"].append(span_pointer) edl['spans'].append(span_pointer)
continue continue
m = re_xanalink.match(line) m = re_xanalink.match(line)
if m: if m:
link_url = m.group(1) link_url = m.group(1)
edl["links"].append( edl['links'].append({
{ 'url': link_url,
"url": link_url, 'text': get_url(link_url),
"text": get_url(link_url), })
}
)
continue continue
return edl return edl
def fulfil_edl_with_sources(edl_text, links=None, hide_all_transclusions=False):
def fulfil_edl_with_sources(
edl_text: str, links=None, hide_all_transclusions: bool = False
):
edl = parse_edl(edl_text) edl = parse_edl(edl_text)
return fulfil_edl_with_links( return fulfil_edl_with_links(edl,
edl, links=links, hide_all_transclusions=hide_all_transclusions links=links,
) hide_all_transclusions=hide_all_transclusions)
def fulfil_edl_with_links(edl, doc_num='', links=None, hide_all_transclusions=False):
def fulfil_edl_with_links( spans = edl['spans']
edl: EDLDict,
doc_num: int | str = "",
links: list[dict[str, str]] | None = None,
hide_all_transclusions: bool = False,
):
spans = edl["spans"]
hide_transclusions = set() hide_transclusions = set()
two_facet_links = [] two_facet_links = []
if not links: if not links:
links = [parse_link(link["text"]) for link in edl["links"]] links = [parse_link(link['text']) for link in edl['links']]
link_num = 0 link_num = 0
for link in links: for link in links:
if link["type"] == "HideTransclusions": if link['type'] == 'HideTransclusions':
hide_transclusions.add(parse_sourcedoc_facet(link["facets"][0])) hide_transclusions.add(parse_sourcedoc_facet(link['facets'][0]))
elif len(link["facets"]) == 2: elif len(link['facets']) == 2:
two_facet_links.append( two_facet_links.append((link_num, [parse_span(span[0]) for span in link['facets']]))
(link_num, [parse_span(span[0]) for span in link["facets"]])
)
link_num += 1 link_num += 1
source = [get_text(url) for url in get_urls(spans)] source = [get_text(url) for url in get_urls(spans)]
source_text = {s["url"]: s["text"] for s in source} source_text = {s['url']: s['text'] for s in source}
source_doc_links = defaultdict(list) source_doc_links = defaultdict(list)
for link_num, facets in two_facet_links: for link_num, facets in two_facet_links:
for facet_num, span in enumerate(facets): for facet_num, span in enumerate(facets):
assert span
url, start, length = span url, start, length = span
source_doc_links[url].append((start, length, link_num, "link", facet_num)) source_doc_links[url].append((start, length, link_num, 'link', facet_num))
if url in source_text: if url in source_text:
continue continue
s = get_text(url) s = get_text(url)
source.append(s) source.append(s)
source_text[s["url"]] = s["text"] source_text[s['url']] = s['text']
for s in source_doc_links.values(): for s in source_doc_links.values():
s.sort() s.sort()
@ -131,67 +92,28 @@ def fulfil_edl_with_links(
for num, (url, start, length) in spans: for num, (url, start, length) in spans:
highlight = not hide_all_transclusions and url not in hide_transclusions highlight = not hide_all_transclusions and url not in hide_transclusions
span_text = source_text[url] # [start:start + length] span_text = source_text[url] # [start:start + length]
new_text = "" new_text = ''
pos = start pos = start
for link_start, link_len, link_num, span_type, facet_num in source_doc_links[ for link_start, link_len, link_num, span_type, facet_num in source_doc_links[url]:
url
]:
link_end = link_start + link_len link_end = link_start + link_len
if link_start >= start + length: if link_start >= start + length:
break break
if link_end < start: if link_end < start:
continue continue
cls = "xanapagelink link" cls = 'xanapagelink link'
link_span = ( link_span = (f'<span class="{cls}" id="link{facet_num}_{link_num}">' +
f'<span class="{cls}" id="link{facet_num}_{link_num}">' escape(span_text[link_start:link_end]) +
+ escape(span_text[link_start:link_end]) '</span>')
+ "</span>"
)
new_text += escape(span_text[pos:link_start]) + link_span new_text += escape(span_text[pos:link_start]) + link_span
pos = link_end pos = link_end
new_text += escape(span_text[pos : start + length]) new_text += escape(span_text[pos:start + length])
cur = xanapage_span_html(num, new_text, url, start, length, highlight=highlight) cur = xanapage_span_html(num, new_text, url, start, length, highlight=highlight)
doc_spans.append(cur) doc_spans.append(cur)
doc = "".join(doc_spans) doc = ''.join(doc_spans)
for s in source:
text = protect_start_spaces(s.pop("text"))
if s["length"] > max_sourcedoc_size:
# print('{} > {}'.format(s['length'], max_sourcedoc_size))
continue
if s["url"] in hide_transclusions:
continue
source_spans = [
(start, length, num, "transclusion", 0)
for num, (url, start, length) in spans
if url == s["url"]
]
source_spans += source_doc_links[s["url"]]
source_spans.sort()
new_text = ""
pos = 0
pprint(source_spans)
for start, length, num, span_type, _ in source_spans:
end = start + length
new_text += (
escape(text[pos:start])
+ span_html(span_type, num)
+ escape(text[start:end])
+ "</span>"
)
pos = end
new_text += escape(text[pos:])
new_text = new_text.replace("\n", "<br/>\n")
s["text"] = new_text
return { return {
"source": source, 'doc': doc.replace('\n', '<br/>\n'),
"doc": doc.replace("\n", "<br/>\n"), 'span_count': len(spans),
"span_count": len(spans), 'link_count': len(two_facet_links),
"link_count": len(two_facet_links),
} }

View file

@ -1,135 +1,82 @@
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields import ( from wtforms.fields import StringField, PasswordField, BooleanField, HiddenField, TextAreaField, FileField, IntegerField
BooleanField, from wtforms.validators import InputRequired, Email, Length, ValidationError, Regexp, NoneOf, Optional
FileField, from .model import User, LoginError, re_username, reserved_name, user_exists
HiddenField,
IntegerField,
PasswordField,
StringField,
TextAreaField,
)
from wtforms.validators import (
Email,
InputRequired,
Length,
NoneOf,
Optional,
Regexp,
ValidationError,
)
from .model import LoginError, User, re_username, reserved_name, user_exists
PASSWORD_LEN = 64 PASSWORD_LEN = 64
EMAIL_LEN = 64 EMAIL_LEN = 64
class SignupForm(FlaskForm): class SignupForm(FlaskForm):
"""Signup form.""" username = StringField('username',
[InputRequired(),
username = StringField( Regexp(re_username),
"username", NoneOf(reserved_name, message='Not available.'),
[ Length(min=3, max=64)],
InputRequired(), [lambda name: name and name.replace(' ', '_')])
Regexp(re_username), email = StringField('e-mail address',
NoneOf(reserved_name, message="Not available."), [InputRequired(), Email(),
Length(min=3, max=64), Length(min=5, max=EMAIL_LEN)],
], description="we never share your e-mail address")
[lambda name: name and name.replace(" ", "_")], password = StringField('password',
) [InputRequired(), Length(min=4, max=PASSWORD_LEN)])
email = StringField(
"e-mail address",
[InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)],
description="we never share your e-mail address",
)
password = StringField(
"password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)]
)
def validate_username(form, field): def validate_username(form, field):
if user_exists(User.username, field.data): if user_exists(User.username, field.data):
raise ValidationError("Not available") raise ValidationError('Not available')
def validate_email(form, field): def validate_email(form, field):
if user_exists(User.email, field.data): if user_exists(User.email, field.data):
raise ValidationError("In use by another account") raise ValidationError('In use by another account')
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
"""Login form.""" user_or_email = StringField('username or e-mail address',
[InputRequired(), Length(min=3, max=EMAIL_LEN)],
[lambda name: name and name.replace(' ', '_')])
password = PasswordField('password',
[InputRequired(), Length(max=PASSWORD_LEN)])
remember = BooleanField('stay logged in')
next = HiddenField('next')
user_or_email = StringField( def validate(self):
"username or e-mail address",
[InputRequired(), Length(min=3, max=EMAIL_LEN)],
[lambda name: name and name.replace(" ", "_")],
)
password = PasswordField("password", [InputRequired(), Length(max=PASSWORD_LEN)])
remember = BooleanField("stay logged in")
next = HiddenField("next")
def validate(self) -> bool:
"""Validate."""
rv = FlaskForm.validate(self) rv = FlaskForm.validate(self)
if not rv: if not rv:
return False return False
try: try:
self.user = User.attempt_login(self.user_or_email.data, self.password.data) self.user = User.attempt_login(self.user_or_email.data,
self.password.data)
return True return True
except LoginError as e: except LoginError as e:
self.user_or_email.errors.append(e.msg) self.user_or_email.errors.append(e.msg)
return False return False
class ForgotPasswordForm(FlaskForm): class ForgotPasswordForm(FlaskForm):
"""Forgot password form.""" user_or_email = StringField('username or e-mail address',
[InputRequired(), Length(max=EMAIL_LEN)])
user_or_email = StringField(
"username or e-mail address", [InputRequired(), Length(max=EMAIL_LEN)]
)
class PasswordForm(FlaskForm): class PasswordForm(FlaskForm):
"""Password form.""" password = PasswordField('new password',
[InputRequired(), Length(min=4, max=PASSWORD_LEN)])
password = PasswordField(
"new password", [InputRequired(), Length(min=4, max=PASSWORD_LEN)]
)
class AccountSettingsForm(FlaskForm): class AccountSettingsForm(FlaskForm):
"""Account settings form.""" full_name = StringField('full name', [Length(max=64)])
email = StringField('e-mail address',
full_name = StringField("full name", [Length(max=64)]) [InputRequired(), Email(),
email = StringField( Length(min=5, max=EMAIL_LEN)])
"e-mail address", [InputRequired(), Email(), Length(min=5, max=EMAIL_LEN)]
)
class ChangePasswordForm(FlaskForm): class ChangePasswordForm(FlaskForm):
"""Change password form.""" old_password = PasswordField('current password',
[InputRequired(), Length(max=PASSWORD_LEN)])
old_password = PasswordField( new_password = PasswordField('new password',
"current password", [InputRequired(), Length(max=PASSWORD_LEN)] [InputRequired(), Length(max=PASSWORD_LEN)])
)
new_password = PasswordField(
"new password", [InputRequired(), Length(max=PASSWORD_LEN)]
)
class SourceDocForm(FlaskForm): class SourceDocForm(FlaskForm):
"""Source doc form.""" text = TextAreaField('text', [InputRequired()])
db_price_per_character = IntegerField('price per character', [Optional()])
text = TextAreaField("text", [InputRequired()]) db_document_price = IntegerField('document price', [Optional()])
db_price_per_character = IntegerField("price per character", [Optional()])
db_document_price = IntegerField("document price", [Optional()])
class ItemForm(FlaskForm): class ItemForm(FlaskForm):
"""Item form.""" text = TextAreaField('text', [InputRequired()])
text = TextAreaField("text", [InputRequired()])
class UploadSourceDocForm(FlaskForm): class UploadSourceDocForm(FlaskForm):
sourcedoc_file = FileField("SourceDoc", [Regexp(r"^[^/\\]+\.txt$")]) sourcedoc_file = FileField('SourceDoc', [Regexp(r'^[^/\\]+\.txt$')])

View file

@ -1,49 +1,40 @@
"""Send email.""" from flask import render_template, current_app
import smtplib
from email import charset
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr, formatdate, make_msgid from email.utils import formatdate, make_msgid
from email import charset
from email.utils import formataddr
import smtplib
from flask import current_app, render_template charset.add_charset('utf-8', charset.SHORTEST, charset.QP)
from .model import User def format_message(user, subject, body):
from_name = current_app.config['FROM_NAME']
from_addr = current_app.config['FROM_ADDR']
charset.add_charset("utf-8", charset.SHORTEST, charset.QP) msg = MIMEText(body, 'plain', 'UTF-8')
msg['Subject'] = subject
msg['To'] = formataddr((user.mail_to_name, user.email))
def format_message(user: User, subject: str, body: str) -> MIMEText: msg['From'] = formataddr((from_name, from_addr))
"""Format an email.""" msg['Date'] = formatdate()
from_name = current_app.config["FROM_NAME"] msg['Message-ID'] = make_msgid()
from_addr = current_app.config["FROM_ADDR"]
msg = MIMEText(body, "plain", "UTF-8")
msg["Subject"] = subject
msg["To"] = formataddr((user.mail_to_name, user.email))
msg["From"] = formataddr((from_name, from_addr))
msg["Date"] = formatdate()
msg["Message-ID"] = make_msgid()
return msg return msg
def send_mail(user, subject, body):
def send_mail(user: User, subject: str, body: str) -> None: bounce_addr = current_app.config['FROM_ADDR']
"""Send an email."""
bounce_addr = current_app.config["FROM_ADDR"]
msg = format_message(user, subject, body) msg = format_message(user, subject, body)
msg_as_string = msg.as_string() msg_as_string = msg.as_string()
if not current_app.config["REALLY_SEND_MAIL"]: # during development if not current_app.config['REALLY_SEND_MAIL']: # during development
return return
s = smtplib.SMTP("localhost") s = smtplib.SMTP('localhost')
s.sendmail(bounce_addr, [user.email], msg_as_string) s.sendmail(bounce_addr, [user.email], msg_as_string)
s.quit() s.quit()
def send_signup_mail(user):
def send_signup_mail(user: User) -> None: ''' unused so far '''
"""Unused so far.""" subject = u'xanadu: verify your account'
subject = "xanadu: verify your account" body = render_template('mail/signup.txt', user=user)
body = render_template("mail/signup.txt", user=user)
send_mail(user, subject, body) send_mail(user, subject, body)

View file

@ -1,34 +1,20 @@
"""Models.""" from flask import url_for, current_app
from __future__ import annotations
import re
import typing
from flask import current_app, url_for
from flask_login import UserMixin
from hashids import Hashids
from sqlalchemy import Column, ForeignKey, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import configure_mappers, relationship, synonym, validates
from sqlalchemy.sql import exists
from sqlalchemy.types import (
Boolean,
DateTime,
Enum,
Integer,
String,
Unicode,
UnicodeText,
)
from sqlalchemy_continuum import make_versioned
from sqlalchemy_continuum.plugins import ActivityPlugin, FlaskPlugin
from werkzeug.security import check_password_hash, generate_password_hash
from .database import session from .database import session
from .parse import ParsedLink, SpanTuple, parse_link, parse_span from .parse import parse_link, parse_sourcedoc_facet, parse_span
from .text import first_non_empty_line from .text import first_non_empty_line
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Unicode, Integer, DateTime, Boolean, UnicodeText, Enum
from sqlalchemy import func
from sqlalchemy.orm import relationship, validates, synonym, configure_mappers
from sqlalchemy.sql import exists
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_continuum import make_versioned
from sqlalchemy_continuum.plugins import FlaskPlugin, ActivityPlugin
import re
from hashids import Hashids
activity_plugin = ActivityPlugin() activity_plugin = ActivityPlugin()
make_versioned(plugins=[FlaskPlugin(), activity_plugin]) make_versioned(plugins=[FlaskPlugin(), activity_plugin])
@ -38,79 +24,38 @@ doc_hashids = Hashids(min_length=8)
Base = declarative_base() Base = declarative_base()
Base.query = session.query_property() Base.query = session.query_property()
re_server_url = re.compile(r"^http://perma.pub/\d+/([^/]+)/([^/]+)$") re_server_url = re.compile(r'^http://perma.pub/\d+/([^/]+)/([^/]+)$')
# list of disallowed usernames - maybe this should be in the database # list of disallowed usernames - maybe this should be in the database
reserved_name = [ reserved_name = ['root', 'admin', 'administrator', 'support', 'info',
"root", 'test', 'tech', 'online', 'old', 'new', 'jobs', 'login', 'job', 'ipad'
"admin", 'iphone', 'javascript', 'script', 'host', 'mail', 'image', 'faq',
"administrator", 'file', 'ftp', 'error', 'warning', 'the', 'assistance', 'maintenance',
"support", 'controller', 'head', 'chief', 'anon']
"info",
"test",
"tech",
"online",
"old",
"new",
"jobs",
"login",
"job",
"ipad" "iphone",
"javascript",
"script",
"host",
"mail",
"image",
"faq",
"file",
"ftp",
"error",
"warning",
"the",
"assistance",
"maintenance",
"controller",
"head",
"chief",
"anon",
]
re_username = re.compile(r"^\w+$", re.U) re_username = re.compile(r'^\w+$', re.U)
re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U) re_full_name = re.compile(r'^([-.\'" ]|[^\W\d_])+$', re.U)
re_comment = re.compile(r"#.*")
AllTitles = dict["Item", str]
def item_url(): def item_url():
return url_for("view.view_item", username=self.user.username, hashid=self.hashid) return url_for('view.view_item',
username=self.user.username,
hashid=self.hashid)
def user_exists(field: Column[str], value: str) -> bool: def user_exists(field, value):
"""Check that user exists.""" return session.query(exists().where(field == value)).scalar()
return typing.cast(bool, session.query(exists().where(field == value)).scalar())
class TimeStampedModel(Base): class TimeStampedModel(Base):
"""Time stamped model."""
__abstract__ = True __abstract__ = True
created = Column(DateTime, default=func.now()) created = Column(DateTime, default=func.now())
modified = Column(DateTime, default=func.now(), onupdate=func.now()) modified = Column(DateTime, default=func.now(), onupdate=func.now())
class LoginError(Exception): class LoginError(Exception):
"""Login error.""" def __init__(self, msg):
def __init__(self, msg: str):
"""Init."""
self.msg = msg self.msg = msg
class User(TimeStampedModel, UserMixin): class User(TimeStampedModel, UserMixin):
"""User model.""" __tablename__ = 'user'
__tablename__ = "user"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
username = Column(Unicode(32), unique=True, nullable=False) username = Column(Unicode(32), unique=True, nullable=False)
pw_hash = Column(String(160), nullable=False) pw_hash = Column(String(160), nullable=False)
@ -123,358 +68,269 @@ class User(TimeStampedModel, UserMixin):
full_name = Column(Unicode(64)) full_name = Column(Unicode(64))
balance = Column(Integer, nullable=False, default=0) balance = Column(Integer, nullable=False, default=0)
user_id = synonym("id") user_id = synonym('id')
name = synonym("full_name") name = synonym('full_name')
user_name = synonym("username") user_name = synonym('username')
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""Init.""" pw_hash = generate_password_hash(kwargs.pop('password'))
pw_hash = generate_password_hash(kwargs.pop("password"))
return super(User, self).__init__(pw_hash=pw_hash, **kwargs) return super(User, self).__init__(pw_hash=pw_hash, **kwargs)
def __repr__(self) -> str: def __repr__(self):
"""Repr.""" return '<User: {!r}>'.format(self.username)
return "<User: {!r}>".format(self.username)
def set_password(self, password: str) -> None: def set_password(self, password):
"""Set password."""
self.pw_hash = generate_password_hash(password) self.pw_hash = generate_password_hash(password)
def check_password(self, password: str) -> bool: def check_password(self, password):
"""Check password."""
return check_password_hash(self.pw_hash, password) return check_password_hash(self.pw_hash, password)
def get_id(self) -> int: def get_id(self):
"""Get ID."""
return self.id return self.id
@validates("email") @validates('email')
def validate_email(self, key, value: str) -> str: def validate_email(self, key, value):
"""Validate email.""" assert '@' in value
assert "@" in value
return value return value
@validates("username") @validates('username')
def validate_usernane(self, key, value): def validate_usernane(self, key, value):
"""Validate username."""
assert re_username.match(value) assert re_username.match(value)
return value return value
@validates("full_name") @validates('full_name')
def validate_full_name(self, key, value): def validate_full_name(self, key, value):
if value: if value:
assert re_full_name.match(value) assert re_full_name.match(value)
return value return value
@hybrid_property @hybrid_property
def is_live(self) -> bool: def is_live(self):
"""User account is live.""" return self.email_verified & ~self.disabled & ~self.deleted
return bool(self.email_verified & ~self.disabled & ~self.deleted)
@classmethod @classmethod
def lookup_user_or_email(cls, user_or_email: str) -> User: def lookup_user_or_email(cls, user_or_email):
"""Lookup user or email.""" field = cls.email if '@' in user_or_email else cls.username
field = cls.email if "@" in user_or_email else cls.username return cls.query.filter(field == user_or_email).one_or_none()
return typing.cast(User, cls.query.filter(field == user_or_email).one_or_none())
@property @property
def mail_to_name(self) -> str: def mail_to_name(self):
"""Name to use on e-mails sent to the user.""" '''Name to use on e-mails sent to the user.'''
return self.full_name or self.username return self.full_name or self.username
@classmethod @classmethod
def attempt_login(cls, user_or_email: str, password: str) -> User: def attempt_login(cls, user_or_email, password):
"""Attempt login."""
user = cls.lookup_user_or_email(user_or_email) user = cls.lookup_user_or_email(user_or_email)
if not user: if not user:
raise LoginError("user not found") raise LoginError('user not found')
if user.disabled: if user.disabled:
raise LoginError("user account disabled") raise LoginError('user account disabled')
if not user.check_password(password): if not user.check_password(password):
raise LoginError("incorrect password") raise LoginError('incorrect password')
return user return user
class Reference(Base): class Reference(Base):
__tablename__ = "reference" __tablename__ = 'reference'
subject_id = Column(Integer, ForeignKey("item.id"), primary_key=True) subject_id = Column(Integer, ForeignKey('item.id'), primary_key=True)
object_id = Column(Integer, ForeignKey("item.id"), primary_key=True) object_id = Column(Integer, ForeignKey('item.id'), primary_key=True)
class Item(TimeStampedModel): class Item(TimeStampedModel):
__tablename__ = "item" __tablename__ = 'item'
__versioned__ = {"base_classes": (TimeStampedModel,)} __versioned__ = {'base_classes': (TimeStampedModel,)}
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("user.id")) user_id = Column(Integer, ForeignKey('user.id'))
published = Column(DateTime) published = Column(DateTime)
type = Column( type = Column(Enum('sourcedoc', 'xanapage', 'xanalink', name='item_type'),
Enum("sourcedoc", "xanapage", "xanalink", name="item_type"), nullable=False nullable=False)
)
filename = Column(Unicode) filename = Column(Unicode)
text = Column(UnicodeText) text = Column(UnicodeText)
subjects = relationship( subjects = relationship('Item',
"Item", lazy='dynamic',
lazy="dynamic", secondary='reference',
secondary="reference", primaryjoin=id == Reference.object_id,
primaryjoin=id == Reference.object_id, secondaryjoin=id == Reference.subject_id)
secondaryjoin=id == Reference.subject_id, objects = relationship('Item',
) lazy='dynamic',
objects = relationship( secondary='reference',
"Item", primaryjoin=id == Reference.subject_id,
lazy="dynamic", secondaryjoin=id == Reference.object_id)
secondary="reference", user = relationship('User', backref='items')
primaryjoin=id == Reference.subject_id,
secondaryjoin=id == Reference.object_id,
)
user = relationship("User", backref="items", lazy="select")
__mapper_args__ = { __mapper_args__ = {
"polymorphic_on": type, 'polymorphic_on': type,
"with_polymorphic": "*", 'with_polymorphic': '*',
} }
@property @property
def hashid(self) -> str: def hashid(self):
"""Hashid for item.""" return doc_hashids.encode(self.id)
return typing.cast(str, doc_hashids.encode(self.id))
@classmethod @classmethod
def get_by_hashid(cls, hashid: str) -> Item | None: def get_by_hashid(cls, hashid):
"""Return the item with the given hashid."""
try: try:
item_id = doc_hashids.decode(hashid)[0] item_id = doc_hashids.decode(hashid)[0]
except IndexError: except IndexError:
return None return
return typing.cast("Item", cls.query.get(item_id)) return cls.query.get(item_id)
def view_url(self, endpoint: str, **kwargs) -> str: def view_url(self, endpoint, **kwargs):
return url_for( return url_for('view.' + endpoint,
"view." + endpoint, username=self.user.username,
username=self.user.username, hashid=self.hashid,
hashid=self.hashid, **kwargs)
**kwargs,
)
@property @property
def url(self) -> str: def url(self):
"""URL for to view this item.""" return self.view_url('view_item')
return self.view_url("view_item")
def url_fragment(self) -> str: def url_fragment(self):
"""URL fragment.""" return self.user.username + '/' + self.hashid
return self.user.username + "/" + self.hashid
def version_url(self, version: int) -> str: def version_url(self, version):
"""URL for version.""" return self.view_url('view_item', v=version)
return self.view_url("view_item", v=version)
@property @property
def history_url(self) -> str: def history_url(self):
"""History URL.""" return self.view_url('history')
return self.view_url("history")
@property @property
def external_url(self) -> str: def external_url(self):
"""External URL.""" base_url = current_app.config.get('BASE_URL')
base_url = current_app.config.get("BASE_URL") if not base_url.endswith('/'):
assert base_url and isinstance(base_url, str) base_url += '/'
if not base_url.endswith("/"):
base_url += "/"
if base_url: if base_url:
return base_url + self.url_fragment() return base_url + self.url_fragment()
else: else:
return self.view_url("view_item", _external=True) return self.view_url('view_item', _external=True)
@property @property
def edit_url(self) -> str: def edit_url(self):
"""Edit URL.""" return self.view_url('edit_item')
return self.view_url("edit_item")
@property @property
def set_title_url(self) -> str: def set_title_url(self):
"""Set title URL.""" return self.view_url('set_title')
return self.view_url("set_title")
def title_from_link(self, titles: AllTitles | None = None) -> str | None: def title_from_link(self, titles=None):
"""Get title from link.""" if not titles:
if titles is None:
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
return titles.get(self) return titles.get(self)
def title(self, titles: AllTitles | None = None) -> str: def title(self, titles=None):
"""Get title.""" return self.type + ': ' + (self.title_from_link(titles) or self.hashid)
return self.type + ": " + (self.title_from_link(titles) or self.hashid)
def has_title(self) -> bool: def has_title(self):
"""Item has a title."""
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
return self in titles return self in titles
def set_title(self, title: str, user: User) -> None: def set_title(self, title, user):
"""Set item title."""
title_source_doc = SourceDoc(text=title, user=user) title_source_doc = SourceDoc(text=title, user=user)
session.add(title_source_doc) session.add(title_source_doc)
session.commit() session.commit()
link_text = """type=title link_text = '''type=title
facet= facet=
sourcedoc: {} sourcedoc: {}
facet= facet=
span: {},start=0,length={}""".format( span: {},start=0,length={}'''.format(self.external_url, title_source_doc.external_url, len(title))
self.external_url, title_source_doc.external_url, len(title)
)
title_link = XanaLink(text=link_text, user=user) title_link = XanaLink(text=link_text, user=user)
session.add(title_link) session.add(title_link)
session.commit() session.commit()
@classmethod @classmethod
def from_external(cls, url: str, home: str | None = None) -> None | Item: def from_external(cls, url, home=None):
"""Get item from URL.""" base = current_app.config.get('BASE_URL')
username: str | None
hashid: str | None
parts = url.split("/")
username, hashid = parts[-2:]
item_id = doc_hashids.decode(hashid)[0]
q = cls.query.filter(User.username == username, cls.id == item_id)
item = q.one_or_none()
if item:
return typing.cast(Item, item)
base = current_app.config.get("BASE_URL")
username, hashid = None, None username, hashid = None, None
if home is None: if home is None:
home = url_for("view.home", _external=True) home = url_for('view.home', _external=True)
if url.startswith(home): if url.startswith(home):
username, _, hashid = url[len(home) :].partition("/") username, _, hashid = url[len(home):].partition('/')
elif base and url.startswith(base): elif base and url.startswith(base):
username, _, hashid = url[len(base) :].lstrip("/").partition("/") username, _, hashid = url[len(base):].lstrip('/').partition('/')
if username and "/" in username or hashid and "/" in hashid: if username and '/' in username or hashid and '/' in hashid:
username, hashid = None, None username, hashid = None, None
if not username or not hashid: if not username or not hashid:
m = re_server_url.match(url) m = re_server_url.match(url)
if not m: if not m:
return None return
username, hashid = m.groups() username, hashid = m.groups()
item_id = doc_hashids.decode(hashid)[0] item_id = doc_hashids.decode(hashid)[0]
q = cls.query.filter(User.username == username, cls.id == item_id) q = cls.query.filter(User.username == username, cls.id == item_id)
return q.one_or_none() return q.one_or_none()
class XanaPage(Item): class XanaPage(Item):
__tablename__ = "xanapage" __tablename__ = 'xanapage'
__mapper_args__ = {"polymorphic_identity": "xanapage"} __mapper_args__ = {'polymorphic_identity': 'xanapage'}
id = Column(Integer, ForeignKey(Item.id), primary_key=True) id = Column(Integer, ForeignKey(Item.id), primary_key=True)
def snippet(self) -> str: def snippet(self):
"""Snippet of text."""
return self.text return self.text
@property @property
def xanaedit_url(self) -> str: def xanaedit_url(self):
"""XanaExit URL.""" return self.view_url('xanaedit_item')
return self.view_url("xanaedit_item")
@property @property
def save_xanaedit_url(self) -> str: def save_xanaedit_url(self):
"""XanaExit save URL.""" return self.view_url('save_xanaedit')
return self.view_url("save_xanaedit")
def iter_spans(self) -> typing.Iterator[SpanTuple]:
"""Span iterator."""
assert self.text is not None
for line in self.text.splitlines():
line = re_comment.sub("", line).strip()
if not line:
continue
span_pointer = parse_span(line)
if span_pointer:
yield span_pointer
def update_references(self) -> None:
"""Update references."""
for url, start, length in self.iter_spans():
src_doc = Item.from_external(url)
if not src_doc or not src_doc.id:
continue
existing = Reference.query.get((self.id, src_doc.id))
if existing:
continue
ref = Reference(subject_id=self.id, object_id=src_doc.id)
session.add(ref)
session.commit()
class XanaLink(Item): class XanaLink(Item):
"""XanaLink.""" __tablename__ = 'xanalink'
__mapper_args__ = {'polymorphic_identity': 'xanalink'}
__tablename__ = "xanalink"
__mapper_args__ = {"polymorphic_identity": "xanalink"}
id = Column(Integer, ForeignKey(Item.id), primary_key=True) id = Column(Integer, ForeignKey(Item.id), primary_key=True)
def parse(self) -> ParsedLink: def parse(self):
"""Parse link."""
assert self.text is not None
return parse_link(self.text) return parse_link(self.text)
@property @property
def link_type(self) -> str: def link_type(self):
"""Get link type.""" return self.parse()['type']
return self.parse()["type"]
def title(self, titles: AllTitles | None = None) -> str: def title(self, titles=None):
"""Title of link."""
if titles is None: if titles is None:
titles = XanaLink.get_all_titles() titles = XanaLink.get_all_titles()
if self in titles: if self in titles:
return self.type + ": " + titles[self] return self.type + ': ' + titles[self]
parsed = self.parse() parsed = self.parse()
assert isinstance(parsed["type"], str)
if parsed["type"] == "title": if parsed['type'] == 'title':
ident = parsed["facets"][0][0].partition(": ")[2] ident = parsed['facets'][0][0].partition(': ')[2]
item = Item.from_external(ident) item = Item.from_external(ident)
if item in titles: if item in titles:
return parsed["type"] + " link for " + item.title(titles=titles) return parsed['type'] + " link for " + item.title(titles=titles)
if parsed["type"]: if parsed['type']:
return parsed["type"] + " link: " + self.hashid return parsed['type'] + " link: " + self.hashid
else: else:
return "link: " + self.hashid return "link: " + self.hashid
def item_and_title(self, home=None): def item_and_title(self, home=None):
link = self.parse() link = self.parse()
if link["type"] != "title": if link['type'] != 'title':
return return
try: try:
facet1, facet2 = link["facets"] facet1, facet2 = link['facets']
except ValueError: except ValueError:
return return
link_type, _, ident = facet1[0].partition(": ") link_type, _, ident = facet1[0].partition(': ')
item = Item.from_external(ident, home) item = Item.from_external(ident, home)
try: ident2, start, length = parse_span(facet2[0])
ident2, start, length = parse_span(facet2[0])
except TypeError:
return
source_of_title = SourceDoc.from_external(ident2, home) source_of_title = SourceDoc.from_external(ident2, home)
if source_of_title: if source_of_title:
return (item, source_of_title.text[start : length + start]) return(item, source_of_title.text[start:length + start])
@classmethod @classmethod
def get_all_titles(cls, home: str | None = None) -> AllTitles: def get_all_titles(cls, home=None):
"""Get all known titles."""
titles = {} titles = {}
for link in cls.query: for link in cls.query:
ret = link.item_and_title(home) ret = link.item_and_title(home)
@ -484,14 +340,12 @@ class XanaLink(Item):
titles[item] = title titles[item] = title
return titles return titles
def snippet(self) -> str | None: def snippet(self):
"""Snippet of text."""
return self.text return self.text
class SourceDoc(Item): class SourceDoc(Item):
__tablename__ = "sourcedoc" __tablename__ = 'sourcedoc'
__mapper_args__ = {"polymorphic_identity": "sourcedoc"} __mapper_args__ = {'polymorphic_identity': 'sourcedoc'}
id = Column(Integer, ForeignKey(Item.id), primary_key=True) id = Column(Integer, ForeignKey(Item.id), primary_key=True)
db_price_per_character = Column(Integer) db_price_per_character = Column(Integer)
@ -499,61 +353,44 @@ class SourceDoc(Item):
@property @property
def document_price(self): def document_price(self):
assert self.text is not None
return self.db_document_price or self.db_price_per_character * len(self.text) return self.db_document_price or self.db_price_per_character * len(self.text)
@property @property
def price_per_character(self): def price_per_character(self):
assert self.text is not None
return self.db_price_per_character or self.db_document_price / len(self.text) return self.db_price_per_character or self.db_document_price / len(self.text)
def snippet( def snippet(self, length=255, killwords=False, end='...', leeway=5):
self,
length: int = 255,
killwords: bool = False,
end: str = "...",
leeway: int = 5,
) -> str:
"""Get snippet of text."""
assert self.text is not None
s = self.text s = self.text
assert length >= len(end), "expected length >= %s, got %s" % (len(end), length) assert length >= len(end), 'expected length >= %s, got %s' % (len(end), length)
assert leeway >= 0, "expected leeway >= 0, got %s" % leeway assert leeway >= 0, 'expected leeway >= 0, got %s' % leeway
if len(s) <= length + leeway: if len(s) <= length + leeway:
return s return s
if killwords: if killwords:
return s[: length - len(end)] + end return s[:length - len(end)] + end
result = s[: length - len(end)].rsplit(" ", 1)[0] result = s[:length - len(end)].rsplit(' ', 1)[0]
return result + end return result + end
def raw_title(self) -> str: def raw_title(self):
"""Raw title."""
return self.title(with_type=False) return self.title(with_type=False)
def title(self, titles: AllTitles | None = None, with_type: bool = True) -> str: def title(self, titles=None, with_type=True):
"""Source document title.""" start = self.type + ': ' if with_type else ''
start = self.type + ": " if with_type else "" titles = XanaLink.get_all_titles()
if titles is None:
titles = XanaLink.get_all_titles()
from_link = self.title_from_link(titles=titles) from_link = self.title_from_link(titles=titles)
if from_link: if from_link:
return start + from_link return start + from_link
assert self.text is not None
first_line = first_non_empty_line(self.text) first_line = first_non_empty_line(self.text)
if first_line: if first_line:
return start + first_line return start + first_line
return start + self.hashid return start + self.hashid
@property @property
def create_xanapage_url(self) -> str: def create_xanapage_url(self):
"""Create xanapage URL.""" return self.view_url('create_xanapage_from_sourcedoc')
return self.view_url("create_xanapage_from_sourcedoc")
@property @property
def entire_span(self) -> str: def entire_span(self):
"""Entire span.""" return self.external_url + f',start=0,length={len(self.text)}'
assert self.text is not None
return self.external_url + f",start=0,length={len(self.text)}"
configure_mappers() configure_mappers()

View file

@ -1,60 +1,34 @@
import os.path
import re import re
import typing import os.path
from html import escape from html import escape
from .types import ExternalText re_span_pointer = re.compile(r'span: (.*),start=(\d+),length=(\d+)')
re_xanalink = re.compile('xanalink: +([^ ]+) *$')
re_span_pointer = re.compile(r"span: (.*),start=(\d+),length=(\d+)") re_facet = re.compile('^facet\d* *=\s*(.*)\s*$')
re_xanalink = re.compile(r"xanalink: +([^ ]+) *$") re_comment = re.compile(r'#.*')
re_facet = re.compile(r"^facet\d* *=\s*(.*)\s*$")
re_comment = re.compile(r"#.*")
project_dir = os.path.dirname(os.path.dirname(__file__)) project_dir = os.path.dirname(os.path.dirname(__file__))
xnb_per_char = 150000 xnb_per_char = 150000
def parse_span(line):
class SpanContents(typing.TypedDict):
"""Contents of a span."""
url: str
start: int
length: int
text: str
SpanTuple = tuple[str, int, int]
SourceText = dict[str, ExternalText]
ParsedLink = dict[str, str | None | list[list[str]]]
def parse_span(line: str) -> None | SpanTuple:
"""Parse a span."""
m = re_span_pointer.match(line) m = re_span_pointer.match(line)
if not m: if not m:
return None return None
return (m.group(1), int(m.group(2)), int(m.group(3))) return (m.group(1), int(m.group(2)), int(m.group(3)))
def get_span(text, url, start, length):
def get_span(text: SourceText, url: str, start: int, length: int) -> SpanContents:
"""Get span from source text."""
return { return {
"url": url, 'url': url,
"start": start, 'start': start,
"length": length, 'length': length,
"text": text[url]["text"][start : start + length], 'text': text[url]['text'][start:start + length]
} }
def get_urls(spans):
def get_urls(spans: list[SpanTuple]) -> set[str]:
"""Get URLs from span tuples."""
return {i[0] for i in spans} return {i[0] for i in spans}
def find_min_max(spans, source):
def find_min_max(spans: list[SpanTuple], source):
text_min, text_max = {}, {} text_min, text_max = {}, {}
for url, start, length in spans: for url, start, length in spans:
if url in text_min: if url in text_min:
@ -68,85 +42,67 @@ def find_min_max(spans: list[SpanTuple], source):
text_max[url] = start + length text_max[url] = start + length
for s in source: for s in source:
url = s["url"] url = s['url']
s["min"] = text_min[url] s['min'] = text_min[url]
s["max"] = text_max[url] s['max'] = text_max[url]
def span_html(span_type, num):
return '<span class="{span_type} sourcedoc{span_type}" id="{span_type}{num}">'.format(num=num, span_type=span_type)
def span_html(span_type: str, num: int) -> str: def xanapage_span_html(num, text, url, start, length, highlight=True, censor=False):
"""Open span tag."""
return f'<span class="{span_type} sourcedoc{span_type}" id="{span_type}{num}">'
def xanapage_span_html(
num: int,
text: str,
url: str,
start: int,
length: int,
highlight: bool = True,
censor: bool = False,
) -> str:
"""Generate HTML to represent a span."""
cls = [] cls = []
if highlight: if highlight:
cls = ["xanapagetransclusion", "transclusion"] cls = ['xanapagetransclusion', 'transclusion']
html_class = f""" class="{' '.join(cls)}""" if cls else "" html_class = ' class="{}"'.format(' '.join(cls)) if cls else ''
html = ( html = '<span id="span{}"{} data-url="{}" data-start="{}" data-length="{}">{}</span>'.format(num, html_class, escape(url), start, length, text)
f'<span id="span{num}"{html_class} data-url="{escape(url)}" '
+ f'data-start="{start}" data-length="{length}">{text}</span>'
)
if censor: if censor:
return '<span class="censor">' + html + "</span>" return '<span class="censor">' + html + '</span>'
else: else:
return html return html
def parse_sourcedoc_facet(facet):
def parse_sourcedoc_facet(facet: list[str]) -> str:
"""Parse sourcedoc facet."""
leg = facet[0] leg = facet[0]
prefix = "sourcedoc: " prefix = 'sourcedoc: '
assert leg.startswith(prefix) assert leg.startswith(prefix)
return leg[len(prefix) :] return leg[len(prefix):]
def parse_xanapage_facet(facet):
def parse_xanapage_facet(facet: list[str]) -> str:
"""Parse xanapage facet."""
leg = facet[0] leg = facet[0]
prefix = "xanapage: " prefix = 'xanapage: '
assert leg.startswith(prefix) assert leg.startswith(prefix)
return leg[len(prefix) :] return leg[len(prefix):]
def parse_link(link_text):
def parse_link(link_text: str) -> ParsedLink:
link_type = None link_type = None
expect = "link_type" expect = 'link_type'
facets = [] facets = []
for line in link_text.splitlines(): for line in link_text.splitlines():
line = re_comment.sub("", line).strip() line = re_comment.sub('', line).strip()
if not line: if not line:
continue continue
if expect == "link_type": if expect == 'link_type':
if line.startswith("type="): if line.startswith('type='):
link_type = line[5:] link_type = line[5:]
expect = "facets" expect = 'facets'
continue continue
if expect != "facets": if expect != 'facets':
# print("unrecognized:", line) # print("unrecognized:", line)
continue continue
m = re_facet.match(line) m = re_facet.match(line)
if m: if m:
legs: list[str] = [] legs = []
facets.append(legs) facets.append(legs)
if m.group(1): if m.group(1):
line = m.group(1) line = m.group(1)
else: else:
continue continue
if legs and legs[-1] == "span:" and line.startswith("http"): if legs and legs[-1] == 'span:' and line.startswith('http'):
legs[-1] += " " + line legs[-1] += ' ' + line
else: else:
legs.append(line.strip()) legs.append(line.strip())
return {"type": link_type, "facets": facets} return {'type': link_type, 'facets': facets}

View file

@ -1,35 +1,21 @@
"""Span."""
import typing
import attr import attr
import attr._make
def greater_than_zero(instance, attribute, value):
def greater_than_zero(instance: "Span", attribute: typing.Any, value: int) -> None:
"""Value is greater than zero."""
if value <= 0: if value <= 0:
raise ValueError("must be greater than 0") raise ValueError('must be greater than 0')
def is_positive(instance, attribute, value):
def is_positive(instance: "Span", attribute: typing.Any, value: int) -> None:
"""Value is positive."""
if value < 0: if value < 0:
raise ValueError("must be positive") raise ValueError('must be positive')
@attr.s @attr.s
class Span: class Span:
"""Span.""" url: int = attr.ib()
url: str = attr.ib()
start: int = attr.ib(validator=is_positive) start: int = attr.ib(validator=is_positive)
length: int = attr.ib(validator=greater_than_zero) length: int = attr.ib(validator=greater_than_zero)
def end(self) -> int: def end(self) -> int:
"""End position of span."""
return self.start + self.length return self.start + self.length
def for_edl(self) -> str: def for_edl(self) -> str:
"""Generate URL parameters for EDL.""" return f'{self.url},start={self.start},length={self.length}'
return f"{self.url},start={self.start},length={self.length}"

View file

@ -1 +1 @@
/usr/share/javascript/bootstrap4 /usr/share/javascript/bootstrap

View file

@ -7,9 +7,7 @@
<title>{% block title %}Xanadu{% endblock %}</title> <title>{% block title %}Xanadu{% endblock %}</title>
<link href="https://fonts.googleapis.com/css?family=Neuton" rel="stylesheet"> <link href="https://fonts.googleapis.com/css?family=Neuton" rel="stylesheet">
{#<link rel="stylesheet" href="{{ url_for('static', filename='bootstrap4/css/bootstrap.css') }}"> #} <link rel="stylesheet" href="{{ url_for('static', filename='bootstrap4/css/bootstrap.css') }}">
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.1/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-4bw+/aepP/YC94hEpVNVgiZdgIC5+VKNBQNGCHeKRQN+PtmoHDEXuppvnDJzQIu9" crossorigin="anonymous">
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}"> <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
{% block style %} {% block style %}
@ -30,12 +28,8 @@
<footer>&copy; 2017 Project Xanadu</footer> <footer>&copy; 2017 Project Xanadu</footer>
</div> </div>
{#
<script src="{{ url_for('static', filename='jquery/jquery.js') }}"></script> <script src="{{ url_for('static', filename='jquery/jquery.js') }}"></script>
<script src="{{ url_for('static', filename='bootstrap4/js/bootstrap.js') }}"></script> <script src="{{ url_for('static', filename='bootstrap4/js/bootstrap.js') }}"></script>
#}
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.1/dist/js/bootstrap.bundle.min.js" integrity="sha384-HwwvtgBNo3bZJJLYd8oVXjrBZt8cqVSpeBNS5n7C8IVInixGAoxmnlMuBnhbgrkm" crossorigin="anonymous"></script>
{% block scripts %} {% block scripts %}
{% endblock %} {% endblock %}

View file

@ -68,12 +68,8 @@
{% for doc in docs %} {% for doc in docs %}
<div data-id="{{ doc.id }}" class="card border-primary my-2"> <div data-id="{{ doc.id }}" class="card border-primary my-2">
<h5 class="card-header"> <h5 class="card-header"><a href="{{ doc.url }}">{{ doc.title() }}</a>
<a href="{{ doc.url }}">{{ doc.title(titles=titles) }}</a> &mdash; {{ doc.user.username }} &mdash; {{ doc.created | datetime }}</h5>
{#
&mdash; {{ doc.user.username }} &mdash; {{ doc.created | datetime }}
#}
</h5>
<div class="card-body"> <div class="card-body">
<p class="card-text"> <p class="card-text">
{%- for line in doc.snippet().splitlines() -%} {%- for line in doc.snippet().splitlines() -%}
@ -83,7 +79,6 @@
</div> </div>
</div> </div>
{% endfor %} {% endfor %}
<div id="lower-buttons"> <div id="lower-buttons">
{{ new_buttons() }} {{ new_buttons() }}
</div> </div>
@ -103,8 +98,7 @@
var show_type = document.getElementById('type_' + doc['type']).checked; var show_type = document.getElementById('type_' + doc['type']).checked;
var show_user = document.getElementById('user_' + doc['user']).checked; var show_user = document.getElementById('user_' + doc['user']).checked;
var show_year = document.getElementById('year_' + doc['year']).checked; var show_year = document.getElementById('year_' + doc['year']).checked;
var show_link_type = (doc['type'] != 'xanalink' || var show_link_type = doc['type'] != 'xanalink' || document.getElementById('link_type_' + doc['link_type']).checked;
(doc['link_type'] && document.getElementById('link_type_' + doc['link_type']).checked));
element.toggle(show_type && show_user && show_link_type && show_year); element.toggle(show_type && show_user && show_link_type && show_year);
}); });

View file

@ -3,7 +3,7 @@
<button class="navbar-toggler navbar-toggler-right" type="button" data-toggle="collapse" data-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation"> <button class="navbar-toggler navbar-toggler-right" type="button" data-toggle="collapse" data-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span> <span class="navbar-toggler-icon"></span>
</button> </button>
<a class="navbar-brand" href="{{ url_for('.home') }}">Xanaflight: Pale Fire, by Nabokov</a> <a class="navbar-brand" href="{{ url_for('.home') }}">Xanaflight: Three pages</a>
<div class="collapse navbar-collapse" id="navbarSupportedContent"> <div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav mr-auto"> <ul class="navbar-nav mr-auto">

View file

@ -94,7 +94,7 @@
</li> </li>
{% if doc.type == 'xanapage' %} {% if doc.type == 'xanapage' %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="{{ doc.url }}/fulfil">Xanaviewer</a> <a class="nav-link" href="{{ doc.url }}/fulfil">In view</a>
</li> </li>
{# {#
<li class="nav-item"> <li class="nav-item">

View file

@ -1,18 +1,12 @@
"""Text utility functions."""
import random
import re import re
import typing import random
re_newline = re.compile("\r?\n") re_newline = re.compile('\r?\n')
def find_newlines(text):
def find_newlines(text: str) -> typing.Iterator[int]:
"""Find position of newlines in a piece of text."""
return (m.end(0) for m in re_newline.finditer(text)) return (m.end(0) for m in re_newline.finditer(text))
def iter_lines(text):
def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]:
start = 0 start = 0
for m in re_newline.finditer(text): for m in re_newline.finditer(text):
end = m.end(0) end = m.end(0)
@ -21,42 +15,33 @@ def iter_lines(text: str) -> typing.Iterator[tuple[int, str]]:
if start < len(text) - 1: if start < len(text) - 1:
yield (start, text[start:]) yield (start, text[start:])
def censor_text(text):
def censor_text(text: str) -> str: def random_chr():
"""Censor text with a random character."""
def random_chr() -> str:
return chr(random.randint(9728, 9983)) return chr(random.randint(9728, 9983))
return ''.join(random_chr() if c.isalnum() else c for c in text)
return "".join(random_chr() if c.isalnum() else c for c in text) def add_highlight(text, span_start, span_length):
def add_highlight(text: str, span_start: int, span_length: int):
span_end = span_start + span_length span_end = span_start + span_length
for start, line in iter_lines(text): for start, line in iter_lines(text):
end = start + len(line) end = start + len(line)
if not ((end > span_start) and (start < span_end)): if not ((end > span_start) and (start < span_end)):
yield start, [{"text": line}] yield start, [{'text': line}]
continue continue
within_line = span_start - start within_line = span_start - start
cur = [] cur = []
if within_line > 0: if within_line > 0:
before = line[:within_line] before = line[:within_line]
cur.append({"text": before}) cur.append({'text': before})
cur.append({"highlight": line[max(within_line, 0) : within_line + span_length]}) cur.append({'highlight': line[max(within_line, 0):within_line + span_length]})
after = line[within_line + span_length :] after = line[within_line + span_length:]
if after: if after:
cur.append({"text": after}) cur.append({'text': after})
yield start, cur yield start, cur
def first_non_empty_line(text):
def first_non_empty_line(text: str) -> str | None:
"""First non-empty line."""
for start, cur in iter_lines(text): for start, cur in iter_lines(text):
tidy = cur.strip() tidy = cur.strip()
if tidy: if tidy:
return tidy return tidy
return None

View file

@ -1,53 +1,42 @@
import requests
from .model import Item
import os.path import os.path
import re import re
import typing
import requests
from .model import Item
from .types import ExternalText
project_dir = os.path.dirname(os.path.dirname(__file__)) project_dir = os.path.dirname(os.path.dirname(__file__))
cache_location = os.path.join(project_dir, "cache") cache_location = os.path.join(project_dir, 'cache')
re_colon_slash = re.compile("[/:]+") re_colon_slash = re.compile('[/:]+')
def url_filename(url):
return re_colon_slash.sub('_', url)
def url_filename(url: str) -> str: def get_text(url):
"""Generate filename from URL."""
return re_colon_slash.sub("_", url)
def get_text(url: str) -> ExternalText:
"""Get text from URL and return as dict."""
# assume UTF-8 # assume UTF-8
text = get_url(url) text = get_url(url)
heading = url.rsplit("/", 1)[-1] heading = url.rsplit('/', 1)[-1]
return { return {
"url": url, 'url': url,
"text": text, 'text': text,
"heading": heading, 'heading': heading,
"length": len(text), 'length': len(text),
} }
def get_url(url):
def get_url(url: str) -> str:
"""Read a URL and return the content."""
item = Item.from_external(url) item = Item.from_external(url)
if item: if item:
return typing.cast(str, item.text) return item.text
content = requests.get(url).content content = requests.get(url).content
return content.decode(errors="replace") return content.decode(errors='replace')
filename = os.path.join(cache_location, url_filename(url)) filename = os.path.join(cache_location, url_filename(url))
if os.path.exists(filename): if os.path.exists(filename):
content = open(filename, "rb").read() content = open(filename, 'rb').read()
else: else:
content = requests.get(url).content content = requests.get(url).content
open(filename, "wb").write(content) open(filename, 'wb').write(content)
return content.decode(errors="replace") return content.decode(errors='replace')

View file

@ -1,29 +1,26 @@
from datetime import date, datetime, timedelta
import humanize import humanize
from datetime import date, timedelta
from jinja2 import Markup
def display_datetime(dt):
def display_datetime(dt: datetime) -> str:
"""Render datetime as a string for display."""
if dt is None: if dt is None:
return "n/a" return 'n/a'
today = date.today() today = date.today()
if today - dt.date() < timedelta(days=1): if today - dt.date() < timedelta(days=1):
return humanize.naturaltime(dt) return humanize.naturaltime(dt)
else: else:
return dt.strftime("%a, %d %b %Y") return dt.strftime('%a, %d %b %Y')
def nbsp_at_start(line):
def nbsp_at_start(line: str) -> str: ''' Protect spaces at the start of a string. '''
"""Protect spaces at the start of a string."""
space_count = 0 space_count = 0
for c in line: for c in line:
if c != " ": if c != ' ':
break break
space_count += 1 space_count += 1
# return Markup('&nbsp;') * space_count + line[space_count:] # return Markup('&nbsp;') * space_count + line[space_count:]
return "\u00A0" * space_count + line[space_count:] return '\u00A0' * space_count + line[space_count:]
def protect_start_spaces(text):
return '\n'.join(nbsp_at_start(line) for line in text.splitlines())
def protect_start_spaces(text: str) -> str:
return "\n".join(nbsp_at_start(line) for line in text.splitlines())

File diff suppressed because it is too large Load diff