sourcing/sourcing/model.py
2017-02-20 11:26:45 +00:00

246 lines
8 KiB
Python

from flask import url_for
from .database import session
from .parse import parse_link, parse_sourcedoc_facet, parse_span
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Unicode, Integer, DateTime, Boolean, UnicodeText, Enum
from sqlalchemy import func
from sqlalchemy.orm import relationship, validates, synonym
from sqlalchemy.sql import exists
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.ext.hybrid import hybrid_property
import re
from hashids import Hashids
doc_hashids = Hashids(min_length=8)
Base = declarative_base()
Base.query = session.query_property()
re_server_url = re.compile('^http://perma.pub/\d+/([^/]+)/([^/]+)$')
# list of disallowed usernames - maybe this should be in the database
reserved_name = ['root', 'admin', 'administrator', 'support', 'info',
'test', 'tech', 'online', 'old', 'new', 'jobs', 'login', 'job', 'ipad'
'iphone', 'javascript', 'script', 'host', 'mail', 'image', 'faq',
'file', 'ftp', 'error', 'warning', 'the', 'assistance', 'maintenance',
'controller', 'head', 'chief', 'anon']
re_username = re.compile('^\w+$', re.U)
re_full_name = re.compile('^([-.\'" ]|[^\W\d_])+$', re.U)
def user_exists(field, value):
return session.query(exists().where(field == value)).scalar()
class TimeStampedModel(Base):
__abstract__ = True
created = Column(DateTime, default=func.now())
modified = Column(DateTime, default=func.now(), onupdate=func.now())
class LoginError(Exception):
def __init__(self, msg):
self.msg = msg
class User(TimeStampedModel, UserMixin):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
username = Column(Unicode(32), unique=True, nullable=False)
pw_hash = Column(String(160), nullable=False)
email = Column(Unicode(64), unique=True, nullable=False)
email_verified = Column(Boolean(), nullable=False, default=False)
disabled = Column(Boolean(), nullable=False, default=False)
deleted = Column(Boolean(), nullable=False, default=False)
is_super = Column(Boolean, nullable=False, default=False)
last_login = Column(DateTime)
full_name = Column(Unicode(64))
balance = Column(Integer, nullable=False, default=0)
user_id = synonym('id')
name = synonym('full_name')
user_name = synonym('username')
def __init__(self, **kwargs):
pw_hash = generate_password_hash(kwargs.pop('password'))
return super(User, self).__init__(pw_hash=pw_hash, **kwargs)
def __repr__(self):
return '<User: {!r}>'.format(self.username)
def set_password(self, password):
self.pw_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.pw_hash, password)
def get_id(self):
return self.id
@validates('email')
def validate_email(self, key, value):
assert '@' in value
return value
@validates('username')
def validate_usernane(self, key, value):
assert re_username.match(value)
return value
@validates('full_name')
def validate_full_name(self, key, value):
if value:
assert re_full_name.match(value)
return value
@hybrid_property
def is_live(self):
return self.email_verified & ~self.disabled & ~self.deleted
@classmethod
def lookup_user_or_email(cls, user_or_email):
field = cls.email if '@' in user_or_email else cls.username
return cls.query.filter(field == user_or_email).one_or_none()
@property
def mail_to_name(self):
'''Name to use on e-mails sent to the user.'''
return self.full_name or self.username
@classmethod
def attempt_login(cls, user_or_email, password):
user = cls.lookup_user_or_email(user_or_email)
if not user:
raise LoginError('user not found')
if user.disabled:
raise LoginError('user account disabled')
if not user.check_password(password):
raise LoginError('incorrect password')
return user
class Item(TimeStampedModel):
__tablename__ = 'item'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('user.id'))
published = Column(DateTime)
type = Column(Enum('sourcedoc', 'xanadoc', 'xanalink', name='item_type'), nullable=False)
filename = Column(Unicode)
text = Column(UnicodeText)
user = relationship('User', backref='items')
__mapper_args__ = {
'polymorphic_identity': 'item',
'polymorphic_on': type,
}
@property
def hashid(self):
return doc_hashids.encode(self.id)
@classmethod
def get_by_hashid(cls, hashid):
try:
item_id = doc_hashids.decode(hashid)[0]
except IndexError:
return
return cls.query.get(item_id)
@property
def url(self):
return url_for('.view_item', username=self.user.username, hashid=self.hashid)
@property
def external_url(self):
return url_for('.view_item',
username=self.user.username,
hashid=self.hashid,
_external=True)
@property
def edit_url(self):
return url_for('.edit_item', username=self.user.username, hashid=self.hashid)
def title(self, titles=None):
if not titles:
titles = XanaLink.get_all_titles()
return self.type + ": " + titles.get(self, self.hashid)
@classmethod
def from_external(cls, url):
home = url_for('.home', _external=True)
if not url.startswith(home):
username, _, hashid = url[len(home):].partition('/')
else:
m = re_server_url.match(url)
if not m:
return
username, hashid = m.groups()
item_id = doc_hashids.decode(hashid)[0]
q = cls.query.filter(User.username == username, cls.id == item_id)
return q.one_or_none()
class XanaDoc(Item):
__tablename__ = 'xanadoc'
id = Column(Integer, ForeignKey('item.id'), primary_key=True)
__mapper_args__ = {'polymorphic_identity': 'xanadoc'}
class XanaLink(Item):
__tablename__ = 'xanalink'
id = Column(Integer, ForeignKey('item.id'), primary_key=True)
__mapper_args__ = {'polymorphic_identity': 'xanalink'}
def parse(self):
return parse_link(self.text)
def title(self, titles=None):
if titles is None:
titles = XanaLink.get_all_titles()
if self in titles:
return self.type + ": " + titles[self]
parsed = self.parse()
if parsed['type'] == 'title':
ident = parsed['facets'][0][0].partition(': ')[2]
item = Item.from_external(ident)
if item in titles:
return parsed['type'] + " link for " + item.title(titles=titles)
return parsed['type'] + " link: " + self.hashid
@classmethod
def get_all_titles(cls):
titles = {}
for link in (obj.parse() for obj in cls.query):
if link['type'] != 'title':
continue
facet1, facet2 = link['facets']
link_type, _, ident = facet1[0].partition(': ')
item = Item.from_external(ident)
print(ident, item)
ident2, start, length = parse_span(facet2[0])
source_of_title = SourceDoc.from_external(ident2)
if source_of_title:
titles[item] = source_of_title.text[start:length + start]
return titles
class SourceDoc(Item):
__tablename__ = 'sourcedoc'
id = Column(Integer, ForeignKey('item.id'), primary_key=True)
db_price_per_character = Column(Integer)
db_document_price = Column(Integer)
@property
def document_price(self):
return self.db_document_price or self.db_price_per_character * len(self.text)
@property
def price_per_character(self):
return self.db_price_per_character or self.db_document_price / len(self.text)
__mapper_args__ = {'polymorphic_identity': 'sourcedoc'}