sourcing/sourcing/view.py

327 lines
11 KiB
Python

from flask import (Blueprint, render_template, request, redirect, flash,
url_for, abort, jsonify, Response, current_app)
from flask_login import (login_user, current_user, logout_user,
login_required, LoginManager)
from .forms import (LoginForm, SignupForm, AccountSettingsForm,
UploadSourceDocForm, SourceDocForm, ItemForm,
ForgotPasswordForm, PasswordForm)
from .model import User, SourceDoc, Item, XanaDoc, XanaLink
from .url import get_url
from .mail import send_mail
from .edl import fulfil_edl_with_sources
from .database import session
from .text import iter_lines, add_highlight
from werkzeug.debug.tbtools import get_current_traceback
from jinja2 import evalcontextfilter, Markup
from functools import wraps
from .utils import nbsp_at_start
from itsdangerous import URLSafeTimedSerializer
import re
login_manager = LoginManager()
login_manager.login_view = '.login'
re_paragraph = re.compile(r'(?:\r\n|\r|\n){2,}')
re_spanpointer = re.compile(r'([A-Za-z0-9]+),start=(\d+),length=(\d+)')
bp = Blueprint('view', __name__)
def init_app(app):
login_manager.init_app(app)
app.register_blueprint(bp)
@app.template_filter()
@evalcontextfilter
def newline_html(eval_ctx, value):
return u'\n\n'.join(Markup(u'<p>') + p.replace('\n', Markup('<br>')) + Markup(u'</p>')
for p in re_paragraph.split(value))
@login_manager.user_loader
def load_user(user_id):
return User.query.get(user_id)
# where do we redirect after signup is complete
view_after_signup = '.home'
@bp.context_processor
def inject_user():
return dict(current_user=current_user)
def show_errors(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
traceback = get_current_traceback(skip=1, show_hidden_frames=False,
ignore_system_exceptions=True)
return traceback.render_full().encode('utf-8', 'replace')
return wrapper
@bp.route('/')
def home():
docs = Item.query.order_by(Item.created)
return render_template('home.html', docs=docs)
@bp.route('/password_reset', methods=['GET', 'POST'])
def password_reset():
site_name = 'perma.pub' # FIXME: move to config
form = ForgotPasswordForm()
if not form.validate_on_submit():
return render_template('auth/password_reset.html', form=form)
ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
user = User.lookup_user_or_email(form.user_or_email.data)
if user:
token = ts.dumps(user.id, salt='password-reset')
reset_link = url_for('.reset_with_token', token=token, _external=True)
reset_mail = render_template('mail/password_reset.txt',
reset_link=reset_link,
site_name=site_name,
user=user)
subject = 'Password reset on ' + site_name
send_mail(user, subject, reset_mail)
return redirect(url_for('.password_reset_sent'))
@bp.route('/password_reset/sent', methods=['GET', 'POST'])
def password_reset_sent():
return render_template('auth/password_reset_sent.html')
@bp.route('/reset/<token>', methods=['GET', 'POST'])
def reset_with_token(token):
ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
try:
user_id = ts.loads(token, salt='password-reset', max_age=86400)
except:
abort(404)
form = PasswordForm()
if not form.validate_on_submit():
return render_template('auth/password_reset_confirm.html', form=form)
user = User.query.get(user_id)
user.set_password(form.password.data)
session.add(user)
session.commit()
return redirect(url_for('.password_reset_complete'))
@bp.route('/reset/done')
def password_reset_complete():
return render_template('auth/password_reset_complete.html')
@bp.route('/source_doc_upload', methods=["POST"])
@show_errors
def source_doc_upload():
f = request.files['sourcedoc_file']
text = f.read()
doc = SourceDoc(text=text, user=current_user, filename=f.filename)
session.add(doc)
session.commit()
flash('new source document uploaded')
return redirect(doc.url)
@bp.route('/about')
def about():
return render_template('about.html')
@bp.route('/contact')
def contact():
return render_template('contact.html')
def redirect_to_home():
return redirect(url_for('.home'))
@bp.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm(next=request.args.get('next'))
if form.validate_on_submit():
login_user(form.user, remember=form.remember.data)
flash('Logged in successfully.')
return redirect(request.form.get('next') or url_for('.home'))
return render_template('login.html', form=form)
@bp.route('/logout')
def logout():
logout_user()
flash('You have been logged out.')
return redirect_to_home()
@bp.route('/signup', methods=['GET', 'POST'])
def signup():
form = SignupForm()
if not form.validate_on_submit():
return render_template('signup.html', form=form)
user = User(**form.data)
session.add(user)
session.commit()
flash('New account created.')
login_user(user)
return redirect(url_for(view_after_signup))
def redirect_to_doc(doc):
return redirect(url_for('.view_document', hashid=doc.hashid))
def get_source_doc(username, hashid):
doc = Item.get_by_hashid(hashid)
if doc and doc.user.username != username:
doc = None
return doc if doc else abort(404)
def get_item(username, hashid):
doc = Item.get_by_hashid(hashid)
if doc and doc.user.username != username:
doc = None
return doc if doc else abort(404)
@bp.route('/<username>/<hashid>/edl')
def view_edl(username, hashid):
item = get_item(username, hashid)
if item.type != 'xanadoc':
return abort(404)
return render_template('view.html',
doc=item,
iter_lines=iter_lines,
nbsp_at_start=nbsp_at_start)
@bp.route('/<username>/<hashid>/raw')
def view_item_raw(username, hashid):
return view_item(username, hashid, raw=True)
@bp.route('/<username>/<hashid>/fulfil')
def fulfil(username, hashid):
item = get_item(username, hashid)
if item.type != 'xanadoc':
return abort(404)
return render_template('view/xanadoc.html',
item=item,
doc=fulfil_edl_with_sources(item.text))
@bp.route('/<username>/<hashid>')
def view_item(username, hashid, raw=False):
if ',' in hashid:
m = re_spanpointer.match(hashid)
hashid, start, length = m.group(1), int(m.group(2)), int(m.group(3))
item = get_item(username, hashid)
if raw:
return Response(item.text[start:length + start], mimetype='text/plain')
else:
start, length = None, None
item = get_item(username, hashid)
if raw:
return Response(item.text, mimetype='text/plain')
return render_template('view.html',
doc=item,
span_start=start,
span_length=length,
add_highlight=add_highlight,
nbsp_at_start=nbsp_at_start,
iter_lines=iter_lines)
@bp.route('/<username>/<hashid>/edit', methods=['GET', 'POST'])
def edit_item(username, hashid):
obj = get_item(username, hashid)
form = SourceDocForm(obj=obj)
if form.validate_on_submit():
form.populate_obj(obj)
session.add(obj)
session.commit()
flash('Changes to {} saved.'.format(obj.type))
return redirect(obj.url)
return render_template('edit.html', form=form, doc=obj)
@bp.route('/source_doc_text/<int:source_doc_id>')
def source_doc_text(source_doc_id):
doc = SourceDoc.query.get(source_doc_id)
return render_template('source_doc_text.html', doc=doc, iter_lines=iter_lines)
@bp.route('/settings/account', methods=['GET', 'POST'])
@login_required
def account_settings():
form = AccountSettingsForm(obj=current_user)
if form.validate_on_submit():
form.populate_obj(current_user)
session.add(current_user)
session.commit()
flash('Account details updated.')
return redirect(url_for(request.endpoint))
return render_template('user/account.html', form=form)
@bp.route('/new/sourcedoc', methods=['GET', 'POST'])
@login_required
def new_sourcedoc():
form = SourceDocForm()
if form.validate_on_submit():
doc = SourceDoc(user=current_user)
form.populate_obj(doc)
session.add(doc)
session.commit()
flash('New document saved.')
return redirect(doc.url)
return render_template('new.html', form=form, title='source document')
@bp.route('/new/xanalink', methods=['GET', 'POST'])
@login_required
def new_xanalink():
form = ItemForm()
if form.validate_on_submit():
obj = XanaLink(user=current_user)
form.populate_obj(obj)
session.add(obj)
session.commit()
flash('New xanalink saved.')
return redirect(obj.url)
return render_template('new.html', form=form, title='xanalink')
@bp.route('/new/xanadoc', methods=['GET', 'POST'])
@login_required
def new_xanadoc():
form = ItemForm()
if form.validate_on_submit():
obj = XanaDoc(user=current_user)
form.populate_obj(obj)
session.add(obj)
session.commit()
flash('New xanadoc saved.')
return redirect(obj.url)
return render_template('new.html', form=form, title='xanadoc')
@bp.route('/edit/<filename>', methods=['GET', 'POST'])
@login_required
def edit_source_document(filename):
doc = get_source_doc(current_user.username, filename)
form = SourceDocForm(obj=doc)
if form.validate_on_submit():
form.populate_obj(doc)
session.add(doc)
session.commit()
flash('Changes to document saved.')
return redirect(doc.url)
return render_template('edit.html', form=form, doc=doc)
@bp.route('/api/1/get/<username>/<filename>')
def api_get_document(username, filename):
doc = get_source_doc(username, filename)
if not doc:
return abort(404)
ret = {
'username': username,
'filename': filename,
'character_count': len(doc.text),
'document_price': str(doc.document_price),
'price_per_character': str(doc.price_per_character),
}
return jsonify(ret)
@bp.route('/get_span.json')
def get_span():
url = request.args['url']
start = int(request.args['start'])
length = int(request.args['length'])
spanid = request.args['spanid']
text = get_url(url)
return jsonify(text=text[start:start + length], spanid=spanid)