sourcing/sourcing/model.py
2017-06-07 14:25:27 +01:00

296 lines
9.5 KiB
Python

from flask import url_for
from .database import session
from .parse import parse_link, parse_sourcedoc_facet, parse_span
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Unicode, Integer, DateTime, Boolean, UnicodeText, Enum
from sqlalchemy import func
from sqlalchemy.orm import relationship, validates, synonym, configure_mappers
from sqlalchemy.sql import exists
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_continuum import make_versioned
from sqlalchemy_continuum.plugins import FlaskPlugin, ActivityPlugin
import re
from hashids import Hashids
activity_plugin = ActivityPlugin()
make_versioned(plugins=[FlaskPlugin(), activity_plugin])
doc_hashids = Hashids(min_length=8)
Base = declarative_base()
Base.query = session.query_property()
re_server_url = re.compile('^http://perma.pub/\d+/([^/]+)/([^/]+)$')
# list of disallowed usernames - maybe this should be in the database
reserved_name = ['root', 'admin', 'administrator', 'support', 'info',
'test', 'tech', 'online', 'old', 'new', 'jobs', 'login', 'job', 'ipad'
'iphone', 'javascript', 'script', 'host', 'mail', 'image', 'faq',
'file', 'ftp', 'error', 'warning', 'the', 'assistance', 'maintenance',
'controller', 'head', 'chief', 'anon']
re_username = re.compile('^\w+$', re.U)
re_full_name = re.compile('^([-.\'" ]|[^\W\d_])+$', re.U)
def user_exists(field, value):
return session.query(exists().where(field == value)).scalar()
class TimeStampedModel(Base):
__abstract__ = True
created = Column(DateTime, default=func.now())
modified = Column(DateTime, default=func.now(), onupdate=func.now())
class LoginError(Exception):
def __init__(self, msg):
self.msg = msg
class User(TimeStampedModel, UserMixin):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
username = Column(Unicode(32), unique=True, nullable=False)
pw_hash = Column(String(160), nullable=False)
email = Column(Unicode(64), unique=True, nullable=False)
email_verified = Column(Boolean(), nullable=False, default=False)
disabled = Column(Boolean(), nullable=False, default=False)
deleted = Column(Boolean(), nullable=False, default=False)
is_super = Column(Boolean, nullable=False, default=False)
last_login = Column(DateTime)
full_name = Column(Unicode(64))
balance = Column(Integer, nullable=False, default=0)
user_id = synonym('id')
name = synonym('full_name')
user_name = synonym('username')
def __init__(self, **kwargs):
pw_hash = generate_password_hash(kwargs.pop('password'))
return super(User, self).__init__(pw_hash=pw_hash, **kwargs)
def __repr__(self):
return '<User: {!r}>'.format(self.username)
def set_password(self, password):
self.pw_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.pw_hash, password)
def get_id(self):
return self.id
@validates('email')
def validate_email(self, key, value):
assert '@' in value
return value
@validates('username')
def validate_usernane(self, key, value):
assert re_username.match(value)
return value
@validates('full_name')
def validate_full_name(self, key, value):
if value:
assert re_full_name.match(value)
return value
@hybrid_property
def is_live(self):
return self.email_verified & ~self.disabled & ~self.deleted
@classmethod
def lookup_user_or_email(cls, user_or_email):
field = cls.email if '@' in user_or_email else cls.username
return cls.query.filter(field == user_or_email).one_or_none()
@property
def mail_to_name(self):
'''Name to use on e-mails sent to the user.'''
return self.full_name or self.username
@classmethod
def attempt_login(cls, user_or_email, password):
user = cls.lookup_user_or_email(user_or_email)
if not user:
raise LoginError('user not found')
if user.disabled:
raise LoginError('user account disabled')
if not user.check_password(password):
raise LoginError('incorrect password')
return user
class Item(TimeStampedModel):
__tablename__ = 'item'
__versioned__ = {'base_classes': (TimeStampedModel,)}
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('user.id'))
published = Column(DateTime)
type = Column(Enum('sourcedoc', 'xanadoc', 'xanalink', name='item_type'),
nullable=False)
filename = Column(Unicode)
text = Column(UnicodeText)
user = relationship('User', backref='items')
__mapper_args__ = {
'polymorphic_on': type,
'with_polymorphic': '*',
}
@property
def hashid(self):
return doc_hashids.encode(self.id)
@classmethod
def get_by_hashid(cls, hashid):
try:
item_id = doc_hashids.decode(hashid)[0]
except IndexError:
return
return cls.query.get(item_id)
@property
def url(self):
return url_for('.view_item',
username=self.user.username,
hashid=self.hashid)
def version_url(self, version):
return url_for('.view_item',
username=self.user.username,
hashid=self.hashid,
v=version)
@property
def history_url(self):
return url_for('.history', username=self.user.username, hashid=self.hashid)
@property
def external_url(self):
return url_for('.view_item',
username=self.user.username,
hashid=self.hashid,
_external=True)
@property
def edit_url(self):
return url_for('.edit_item', username=self.user.username, hashid=self.hashid)
@property
def set_title_url(self):
return url_for('.set_title', username=self.user.username, hashid=self.hashid)
def title(self, titles=None):
if not titles:
titles = XanaLink.get_all_titles()
return self.type + ": " + titles.get(self, self.hashid)
def has_title(self):
titles = XanaLink.get_all_titles()
return self in titles
def title_from_link(self):
return XanaLink.get_all_titles().get(self)
def set_title(self, title, user):
title_source_doc = SourceDoc(text=title, user=user)
session.add(title_source_doc)
session.commit()
link_text = '''type=title
facet=
sourcedoc: {}
facet=
span: {},start=0,length={}'''.format(self.external_url, title_source_doc.external_url, len(title))
title_link = XanaLink(text=link_text, user=user)
session.add(title_link)
session.commit()
@classmethod
def from_external(cls, url):
home = url_for('.home', _external=True)
if url.startswith(home):
username, _, hashid = url[len(home):].partition('/')
else:
m = re_server_url.match(url)
if not m:
return
username, hashid = m.groups()
item_id = doc_hashids.decode(hashid)[0]
q = cls.query.filter(User.username == username, cls.id == item_id)
return q.one_or_none()
class XanaDoc(Item):
__tablename__ = 'xanadoc'
__mapper_args__ = {'polymorphic_identity': 'xanadoc'}
id = Column(Integer, ForeignKey(Item.id), primary_key=True)
class XanaLink(Item):
__tablename__ = 'xanalink'
__mapper_args__ = {'polymorphic_identity': 'xanalink'}
id = Column(Integer, ForeignKey(Item.id), primary_key=True)
def parse(self):
return parse_link(self.text)
def title(self, titles=None):
if titles is None:
titles = XanaLink.get_all_titles()
if self in titles:
return self.type + ": " + titles[self]
parsed = self.parse()
if parsed['type'] == 'title':
ident = parsed['facets'][0][0].partition(': ')[2]
item = Item.from_external(ident)
if item in titles:
return parsed['type'] + " link for " + item.title(titles=titles)
if parsed['type']:
return parsed['type'] + " link: " + self.hashid
else:
return "link: " + self.hashid
@classmethod
def get_all_titles(cls):
titles = {}
for link in (obj.parse() for obj in cls.query):
if link['type'] != 'title':
continue
facet1, facet2 = link['facets']
link_type, _, ident = facet1[0].partition(': ')
item = Item.from_external(ident)
ident2, start, length = parse_span(facet2[0])
source_of_title = SourceDoc.from_external(ident2)
if source_of_title:
titles[item] = source_of_title.text[start:length + start]
return titles
class SourceDoc(Item):
__tablename__ = 'sourcedoc'
__mapper_args__ = {'polymorphic_identity': 'sourcedoc'}
id = Column(Integer, ForeignKey(Item.id), primary_key=True)
db_price_per_character = Column(Integer)
db_document_price = Column(Integer)
@property
def document_price(self):
return self.db_document_price or self.db_price_per_character * len(self.text)
@property
def price_per_character(self):
return self.db_price_per_character or self.db_document_price / len(self.text)
configure_mappers()