from datetime import datetime, timedelta, date, timezone from time import time from typing import List, Union import requests from flask import current_app, escape, url_for, render_template_string from flask_login import UserMixin, current_user from sqlalchemy import or_, text, desc from werkzeug.security import generate_password_hash, check_password_hash from flask_babel import _, lazy_gettext as _l from sqlalchemy.orm import backref from sqlalchemy_utils.types import TSVectorType # https://sqlalchemy-searchable.readthedocs.io/en/latest/installation.html from sqlalchemy.dialects.postgresql import ARRAY from sqlalchemy.ext.mutable import MutableList from flask_sqlalchemy import BaseQuery from sqlalchemy_searchable import SearchQueryMixin from app import db, login, cache, celery import jwt import os import math from app.constants import SUBSCRIPTION_NONMEMBER, SUBSCRIPTION_MEMBER, SUBSCRIPTION_MODERATOR, SUBSCRIPTION_OWNER, \ SUBSCRIPTION_BANNED, SUBSCRIPTION_PENDING, NOTIF_USER, NOTIF_COMMUNITY, NOTIF_TOPIC, NOTIF_POST, NOTIF_REPLY, \ ROLE_ADMIN, ROLE_STAFF # datetime.utcnow() is depreciated in Python 3.12 so it will need to be swapped out eventually def utcnow(): return datetime.utcnow() class FullTextSearchQuery(BaseQuery, SearchQueryMixin): pass class BannedInstances(db.Model): id = db.Column(db.Integer, primary_key=True) domain = db.Column(db.String(256), index=True) reason = db.Column(db.String(256)) initiator = db.Column(db.String(256)) created_at = db.Column(db.DateTime, default=utcnow) class AllowedInstances(db.Model): id = db.Column(db.Integer, primary_key=True) domain = db.Column(db.String(256), index=True) created_at = db.Column(db.DateTime, default=utcnow) class Instance(db.Model): id = db.Column(db.Integer, primary_key=True) domain = db.Column(db.String(256), index=True) inbox = db.Column(db.String(256)) shared_inbox = db.Column(db.String(256)) outbox = db.Column(db.String(256)) vote_weight = db.Column(db.Float, default=1.0) software = db.Column(db.String(50)) version = db.Column(db.String(50)) created_at = db.Column(db.DateTime, default=utcnow) updated_at = db.Column(db.DateTime, default=utcnow) last_seen = db.Column(db.DateTime, default=utcnow) # When an Activity was received from them last_successful_send = db.Column(db.DateTime) # When we successfully sent them an Activity failures = db.Column(db.Integer, default=0) # How many times we failed to send (reset to 0 after every successful send) most_recent_attempt = db.Column(db.DateTime) # When the most recent failure was dormant = db.Column(db.Boolean, default=False) # True once this instance is considered offline and not worth sending to any more start_trying_again = db.Column(db.DateTime) # When to start trying again. Should grow exponentially with each failure. gone_forever = db.Column(db.Boolean, default=False) # True once this instance is considered offline forever - never start trying again ip_address = db.Column(db.String(50)) trusted = db.Column(db.Boolean, default=False) posting_warning = db.Column(db.String(512)) nodeinfo_href = db.Column(db.String(100)) posts = db.relationship('Post', backref='instance', lazy='dynamic') post_replies = db.relationship('PostReply', backref='instance', lazy='dynamic') communities = db.relationship('Community', backref='instance', lazy='dynamic') def online(self): return not self.dormant and not self.gone_forever def user_is_admin(self, user_id): role = InstanceRole.query.filter_by(instance_id=self.id, user_id=user_id).first() return role and role.role == 'admin' class InstanceRole(db.Model): instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) role = db.Column(db.String(50), default='admin') user = db.relationship('User', lazy='joined') class InstanceBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class Conversation(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) reported = db.Column(db.Boolean, default=False) read = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) updated_at = db.Column(db.DateTime, default=utcnow) initiator = db.relationship('User', backref=db.backref('conversations_initiated', lazy='dynamic'), foreign_keys=[user_id]) messages = db.relationship('ChatMessage', backref=db.backref('conversation'), cascade='all,delete', lazy='dynamic') def member_names(self, user_id): retval = [] for member in self.members: if member.id != user_id: retval.append(member.display_name()) return ', '.join(retval) def is_member(self, user): for member in self.members: if member.id == user.id: return True return False def instances(self): retval = [] for member in self.members: if member.instance.id != 1 and member.instance not in retval: retval.append(member.instance) return retval @staticmethod def find_existing_conversation(recipient, sender): sql = """SELECT c.id AS conversation_id, c.created_at AS conversation_created_at, c.updated_at AS conversation_updated_at, cm1.user_id AS user1_id, cm2.user_id AS user2_id FROM public.conversation AS c JOIN public.conversation_member AS cm1 ON c.id = cm1.conversation_id JOIN public.conversation_member AS cm2 ON c.id = cm2.conversation_id WHERE cm1.user_id = :user_id_1 AND cm2.user_id = :user_id_2 AND cm1.user_id <> cm2.user_id;""" ec = db.session.execute(text(sql), {'user_id_1': recipient.id, 'user_id_2': sender.id}).fetchone() return Conversation.query.get(ec[0]) if ec else None conversation_member = db.Table('conversation_member', db.Column('user_id', db.Integer, db.ForeignKey('user.id')), db.Column('conversation_id', db.Integer, db.ForeignKey('conversation.id')), db.PrimaryKeyConstraint('user_id', 'conversation_id') ) class ChatMessage(db.Model): id = db.Column(db.Integer, primary_key=True) sender_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id'), index=True) body = db.Column(db.Text) body_html = db.Column(db.Text) reported = db.Column(db.Boolean, default=False) read = db.Column(db.Boolean, default=False) encrypted = db.Column(db.String(15)) created_at = db.Column(db.DateTime, default=utcnow) sender = db.relationship('User', foreign_keys=[sender_id]) class Tag(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256), index=True) # lowercase version of tag, e.g. solarstorm display_as = db.Column(db.String(256)) # Version of tag with uppercase letters, e.g. SolarStorm post_count = db.Column(db.Integer, default=0) banned = db.Column(db.Boolean, default=False, index=True) class Language(db.Model): id = db.Column(db.Integer, primary_key=True) code = db.Column(db.String(5), index=True) name = db.Column(db.String(50)) community_language = db.Table('community_language', db.Column('community_id', db.Integer, db.ForeignKey('community.id')), db.Column('language_id', db.Integer, db.ForeignKey('language.id')), db.PrimaryKeyConstraint('community_id', 'language_id') ) post_tag = db.Table('post_tag', db.Column('post_id', db.Integer, db.ForeignKey('post.id')), db.Column('tag_id', db.Integer, db.ForeignKey('tag.id')), db.PrimaryKeyConstraint('post_id', 'tag_id') ) class File(db.Model): id = db.Column(db.Integer, primary_key=True) file_path = db.Column(db.String(255)) file_name = db.Column(db.String(255)) width = db.Column(db.Integer) height = db.Column(db.Integer) alt_text = db.Column(db.String(1500)) source_url = db.Column(db.String(1024)) thumbnail_path = db.Column(db.String(255)) thumbnail_width = db.Column(db.Integer) thumbnail_height = db.Column(db.Integer) def view_url(self, resize=False): if self.source_url: if resize and '/pictrs/' in self.source_url and '?' not in self.source_url: return f'{self.source_url}?thumbnail=1024' else: return self.source_url elif self.file_path: file_path = self.file_path[4:] if self.file_path.startswith('app/') else self.file_path return f"https://{current_app.config['SERVER_NAME']}/{file_path}" else: return '' def medium_url(self): if self.file_path is None: return self.thumbnail_url() file_path = self.file_path[4:] if self.file_path.startswith('app/') else self.file_path return f"https://{current_app.config['SERVER_NAME']}/{file_path}" def thumbnail_url(self): if self.thumbnail_path is None: if self.source_url: return self.source_url else: return '' thumbnail_path = self.thumbnail_path[4:] if self.thumbnail_path.startswith('app/') else self.thumbnail_path return f"https://{current_app.config['SERVER_NAME']}/{thumbnail_path}" def delete_from_disk(self): purge_from_cache = [] if self.file_path and os.path.isfile(self.file_path): try: os.unlink(self.file_path) except FileNotFoundError as e: ... purge_from_cache.append(self.file_path.replace('app/', f"https://{current_app.config['SERVER_NAME']}/")) if self.thumbnail_path and os.path.isfile(self.thumbnail_path): try: os.unlink(self.thumbnail_path) except FileNotFoundError as e: ... purge_from_cache.append(self.thumbnail_path.replace('app/', f"https://{current_app.config['SERVER_NAME']}/")) if self.source_url and self.source_url.startswith('http') and current_app.config['SERVER_NAME'] in self.source_url: # self.source_url is always a url rather than a file path, which makes deleting the file a bit fiddly try: os.unlink(self.source_url.replace(f"https://{current_app.config['SERVER_NAME']}/", 'app/')) except FileNotFoundError as e: ... purge_from_cache.append(self.source_url) # otoh it makes purging the cdn cache super easy. if purge_from_cache: flush_cdn_cache(purge_from_cache) def filesize(self): size = 0 if self.file_path and os.path.exists(self.file_path): size += os.path.getsize(self.file_path) if self.thumbnail_path and os.path.exists(self.thumbnail_path): size += os.path.getsize(self.thumbnail_path) return size def flush_cdn_cache(url: Union[str, List[str]]): zone_id = current_app.config['CLOUDFLARE_ZONE_ID'] token = current_app.config['CLOUDFLARE_API_TOKEN'] if zone_id and token: if current_app.debug: flush_cdn_cache_task(url) else: flush_cdn_cache_task.delay(url) @celery.task def flush_cdn_cache_task(to_purge: Union[str, List[str]]): zone_id = current_app.config['CLOUDFLARE_ZONE_ID'] token = current_app.config['CLOUDFLARE_API_TOKEN'] headers = { 'Authorization': f"Bearer {token}", 'Content-Type': 'application/json' } # url can be a string or a list of strings body = '' if isinstance(to_purge, str) and to_purge == 'all': body = { 'purge_everything': True } else: if isinstance(to_purge, str): body = { 'files': [to_purge] } elif isinstance(to_purge, list): body = { 'files': to_purge } if body: response = requests.request( 'POST', f'https://api.cloudflare.com/client/v4/zones/{zone_id}/purge_cache', headers=headers, json=body, timeout=5, ) class Topic(db.Model): id = db.Column(db.Integer, primary_key=True) machine_name = db.Column(db.String(50), index=True) name = db.Column(db.String(50)) num_communities = db.Column(db.Integer, default=0) parent_id = db.Column(db.Integer) communities = db.relationship('Community', lazy='dynamic', backref='topic', cascade="all, delete-orphan") def path(self): return_value = [self.machine_name] parent_id = self.parent_id while parent_id is not None: parent_topic = Topic.query.get(parent_id) if parent_topic is None: break return_value.append(parent_topic.machine_name) parent_id = parent_topic.parent_id return_value = list(reversed(return_value)) return '/'.join(return_value) def notify_new_posts(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_TOPIC).first() return existing_notification is not None class Community(db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) icon_id = db.Column(db.Integer, db.ForeignKey('file.id')) image_id = db.Column(db.Integer, db.ForeignKey('file.id')) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) name = db.Column(db.String(256), index=True) title = db.Column(db.String(256)) description = db.Column(db.Text) # markdown description_html = db.Column(db.Text) # html equivalent of above markdown rules = db.Column(db.Text) rules_html = db.Column(db.Text) content_warning = db.Column(db.Text) # "Are you sure you want to view this community?" subscriptions_count = db.Column(db.Integer, default=0) post_count = db.Column(db.Integer, default=0) post_reply_count = db.Column(db.Integer, default=0) nsfw = db.Column(db.Boolean, default=False) nsfl = db.Column(db.Boolean, default=False) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) low_quality = db.Column(db.Boolean, default=False) # upvotes earned in low quality communities don't improve reputation created_at = db.Column(db.DateTime, default=utcnow) last_active = db.Column(db.DateTime, default=utcnow) public_key = db.Column(db.Text) private_key = db.Column(db.Text) content_retention = db.Column(db.Integer, default=-1) topic_id = db.Column(db.Integer, db.ForeignKey('topic.id'), index=True) default_layout = db.Column(db.String(15)) posting_warning = db.Column(db.String(512)) ap_id = db.Column(db.String(255), index=True) ap_profile_id = db.Column(db.String(255), index=True) ap_followers_url = db.Column(db.String(255)) ap_preferred_username = db.Column(db.String(255)) ap_discoverable = db.Column(db.Boolean, default=False) ap_public_url = db.Column(db.String(255)) ap_fetched_at = db.Column(db.DateTime) ap_deleted_at = db.Column(db.DateTime) ap_inbox_url = db.Column(db.String(255)) ap_outbox_url = db.Column(db.String(255)) ap_featured_url = db.Column(db.String(255)) ap_moderators_url = db.Column(db.String(255)) ap_domain = db.Column(db.String(255)) banned = db.Column(db.Boolean, default=False) restricted_to_mods = db.Column(db.Boolean, default=False) local_only = db.Column(db.Boolean, default=False) # only users on this instance can post new_mods_wanted = db.Column(db.Boolean, default=False) searchable = db.Column(db.Boolean, default=True) private_mods = db.Column(db.Boolean, default=False) # Which feeds posts from this community show up in show_home = db.Column(db.Boolean, default=False) # For anonymous users. When logged in, the home feed shows posts from subscribed communities show_popular = db.Column(db.Boolean, default=True) show_all = db.Column(db.Boolean, default=True) ignore_remote_language = db.Column(db.Boolean, default=False) search_vector = db.Column(TSVectorType('name', 'title', 'description', 'rules')) posts = db.relationship('Post', lazy='dynamic', cascade="all, delete-orphan") replies = db.relationship('PostReply', lazy='dynamic', cascade="all, delete-orphan") wiki_pages = db.relationship('CommunityWikiPage', lazy='dynamic', backref='community', cascade="all, delete-orphan") icon = db.relationship('File', foreign_keys=[icon_id], single_parent=True, backref='community', cascade="all, delete-orphan") image = db.relationship('File', foreign_keys=[image_id], single_parent=True, cascade="all, delete-orphan") languages = db.relationship('Language', lazy='dynamic', secondary=community_language, backref=db.backref('communities', lazy='dynamic')) def language_ids(self): return [language.id for language in self.languages.all()] @cache.memoize(timeout=500) def icon_image(self, size='default') -> str: if self.icon_id is not None: if size == 'default': if self.icon.file_path is not None: if self.icon.file_path.startswith('app/'): return self.icon.file_path.replace('app/', '/') else: return self.icon.file_path if self.icon.source_url is not None: if self.icon.source_url.startswith('app/'): return self.icon.source_url.replace('app/', '/') else: return self.icon.source_url elif size == 'tiny': if self.icon.thumbnail_path is not None: if self.icon.thumbnail_path.startswith('app/'): return self.icon.thumbnail_path.replace('app/', '/') else: return self.icon.thumbnail_path if self.icon.source_url is not None: if self.icon.source_url.startswith('app/'): return self.icon.source_url.replace('app/', '/') else: return self.icon.source_url return '/static/images/1px.gif' @cache.memoize(timeout=500) def header_image(self) -> str: if self.image_id is not None: if self.image.file_path is not None: if self.image.file_path.startswith('app/'): return self.image.file_path.replace('app/', '/') else: return self.image.file_path if self.image.source_url is not None: if self.image.source_url.startswith('app/'): return self.image.source_url.replace('app/', '/') else: return self.image.source_url return '' def display_name(self) -> str: if self.ap_id is None: return self.title else: return f"{self.title}@{self.ap_domain}" def link(self) -> str: if self.ap_id is None: return self.name else: return self.ap_id.lower() @cache.memoize(timeout=3) def moderators(self): return CommunityMember.query.filter((CommunityMember.community_id == self.id) & (or_( CommunityMember.is_owner, CommunityMember.is_moderator )) ).filter(CommunityMember.is_banned == False).all() def is_member(self, user): if user is None: return CommunityMember.query.filter(CommunityMember.user_id == current_user.get_id(), CommunityMember.community_id == self.id, CommunityMember.is_banned == False).all() else: return CommunityMember.query.filter(CommunityMember.user_id == user.id, CommunityMember.community_id == self.id, CommunityMember.is_banned == False).all() def is_moderator(self, user=None): if user is None: return any(moderator.user_id == current_user.get_id() for moderator in self.moderators()) else: return any(moderator.user_id == user.id for moderator in self.moderators()) def is_owner(self, user=None): if user is None: return any(moderator.user_id == current_user.get_id() and moderator.is_owner for moderator in self.moderators()) else: return any(moderator.user_id == user.id and moderator.is_owner for moderator in self.moderators()) def is_instance_admin(self, user): if self.instance_id: instance_role = InstanceRole.query.filter(InstanceRole.instance_id == self.instance_id, InstanceRole.user_id == user.id, InstanceRole.role == 'admin').first() return instance_role is not None else: return False def user_is_banned(self, user): # use communities_banned_from() instead of this method, where possible. Redis caches the result of communities_banned_from() # we cannot use communities_banned_from() in models.py because it causes a circular import community_bans = CommunityBan.query.filter(CommunityBan.user_id == user.id).all() return self.id in [cb.community_id for cb in community_bans] def profile_id(self): retval = self.ap_profile_id if self.ap_profile_id else f"https://{current_app.config['SERVER_NAME']}/c/{self.name}" return retval.lower() def public_url(self): result = self.ap_public_url if self.ap_public_url else f"https://{current_app.config['SERVER_NAME']}/c/{self.name}" return result def is_local(self): return self.ap_id is None or self.profile_id().startswith('https://' + current_app.config['SERVER_NAME']) def local_url(self): if self.is_local(): return self.ap_profile_id else: return f"https://{current_app.config['SERVER_NAME']}/c/{self.ap_id}" def notify_new_posts(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_COMMUNITY).first() return existing_notification is not None # ids of all the users who want to be notified when there is a post in this community def notification_subscribers(self): return list(db.session.execute(text('SELECT user_id FROM "notification_subscription" WHERE entity_id = :community_id AND type = :type '), {'community_id': self.id, 'type': NOTIF_COMMUNITY}).scalars()) # instances that have users which are members of this community. (excluding the current instance) def following_instances(self, include_dormant=False) -> List[Instance]: instances = Instance.query.join(User, User.instance_id == Instance.id).join(CommunityMember, CommunityMember.user_id == User.id) instances = instances.filter(CommunityMember.community_id == self.id, CommunityMember.is_banned == False) if not include_dormant: instances = instances.filter(Instance.dormant == False) instances = instances.filter(Instance.id != 1, Instance.gone_forever == False) return instances.all() def has_followers_from_domain(self, domain: str) -> bool: instances = Instance.query.join(User, User.instance_id == Instance.id).join(CommunityMember, CommunityMember.user_id == User.id) instances = instances.filter(CommunityMember.community_id == self.id, CommunityMember.is_banned == False) for instance in instances: if instance.domain == domain: return True return False def loop_videos(self) -> bool: return 'gifs' in self.name def delete_dependencies(self): for post in self.posts: post.delete_dependencies() db.session.delete(post) db.session.query(CommunityBan).filter(CommunityBan.community_id == self.id).delete() db.session.query(CommunityBlock).filter(CommunityBlock.community_id == self.id).delete() db.session.query(CommunityJoinRequest).filter(CommunityJoinRequest.community_id == self.id).delete() db.session.query(CommunityMember).filter(CommunityMember.community_id == self.id).delete() db.session.query(Report).filter(Report.suspect_community_id == self.id).delete() user_role = db.Table('user_role', db.Column('user_id', db.Integer, db.ForeignKey('user.id')), db.Column('role_id', db.Integer, db.ForeignKey('role.id')), db.PrimaryKeyConstraint('user_id', 'role_id') ) class User(UserMixin, db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) user_name = db.Column(db.String(255), index=True) title = db.Column(db.String(256)) email = db.Column(db.String(255), index=True) password_hash = db.Column(db.String(128)) verified = db.Column(db.Boolean, default=False) verification_token = db.Column(db.String(16), index=True) banned = db.Column(db.Boolean, default=False) deleted = db.Column(db.Boolean, default=False) about = db.Column(db.Text) # markdown about_html = db.Column(db.Text) # html keywords = db.Column(db.String(256)) matrix_user_id = db.Column(db.String(256)) hide_nsfw = db.Column(db.Integer, default=1) hide_nsfl = db.Column(db.Integer, default=1) created = db.Column(db.DateTime, default=utcnow) last_seen = db.Column(db.DateTime, default=utcnow, index=True) avatar_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) cover_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) public_key = db.Column(db.Text) private_key = db.Column(db.Text) newsletter = db.Column(db.Boolean, default=True) email_unread = db.Column(db.Boolean, default=True) # True if they want to receive 'unread notifications' emails email_unread_sent = db.Column(db.Boolean) # True after a 'unread notifications' email has been sent. None for remote users receive_message_mode = db.Column(db.String(20), default='Closed') # possible values: Open, TrustedOnly, Closed bounces = db.Column(db.SmallInteger, default=0) timezone = db.Column(db.String(20)) reputation = db.Column(db.Float, default=0.0) attitude = db.Column(db.Float, default=1.0) # (upvotes cast - downvotes cast) / (upvotes + downvotes). A number between 1 and -1 is the ratio between up and down votes they cast stripe_customer_id = db.Column(db.String(50)) stripe_subscription_id = db.Column(db.String(50)) searchable = db.Column(db.Boolean, default=True) indexable = db.Column(db.Boolean, default=False) bot = db.Column(db.Boolean, default=False) ignore_bots = db.Column(db.Integer, default=0) unread_notifications = db.Column(db.Integer, default=0) ip_address = db.Column(db.String(50)) ip_address_country = db.Column(db.String(50)) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) reports = db.Column(db.Integer, default=0) # how many times this user has been reported. default_sort = db.Column(db.String(25), default='hot') theme = db.Column(db.String(20), default='') referrer = db.Column(db.String(256)) markdown_editor = db.Column(db.Boolean, default=False) interface_language = db.Column(db.String(10)) # a locale that the translation system understands e.g. 'en' or 'en-us'. If empty, use browser default language_id = db.Column(db.Integer, db.ForeignKey('language.id')) # the default choice in the language dropdown when composing posts & comments reply_collapse_threshold = db.Column(db.Integer, default=-10) reply_hide_threshold = db.Column(db.Integer, default=-20) avatar = db.relationship('File', lazy='joined', foreign_keys=[avatar_id], single_parent=True, cascade="all, delete-orphan") cover = db.relationship('File', lazy='joined', foreign_keys=[cover_id], single_parent=True, cascade="all, delete-orphan") instance = db.relationship('Instance', lazy='joined', foreign_keys=[instance_id]) conversations = db.relationship('Conversation', lazy='dynamic', secondary=conversation_member, backref=db.backref('members', lazy='joined')) ap_id = db.Column(db.String(255), index=True) # e.g. username@server ap_profile_id = db.Column(db.String(255), index=True) # e.g. https://server/u/username ap_public_url = db.Column(db.String(255)) # e.g. https://server/u/UserName ap_fetched_at = db.Column(db.DateTime) ap_followers_url = db.Column(db.String(255)) ap_preferred_username = db.Column(db.String(255)) ap_manually_approves_followers = db.Column(db.Boolean, default=False) ap_deleted_at = db.Column(db.DateTime) ap_inbox_url = db.Column(db.String(255)) ap_domain = db.Column(db.String(255)) search_vector = db.Column(TSVectorType('user_name', 'about', 'keywords')) activity = db.relationship('ActivityLog', backref='account', lazy='dynamic', cascade="all, delete-orphan") posts = db.relationship('Post', lazy='dynamic', cascade="all, delete-orphan") post_replies = db.relationship('PostReply', lazy='dynamic', cascade="all, delete-orphan") roles = db.relationship('Role', secondary=user_role, lazy='dynamic', cascade="all, delete") def __repr__(self): return ''.format(self.user_name, self.id) def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): try: result = check_password_hash(self.password_hash, password) return result except Exception: return False def get_id(self): if self.is_authenticated: return self.id else: return 0 def display_name(self): if self.deleted is False: if self.title: return self.title else: return self.user_name else: return '[deleted]' @cache.memoize(timeout=500) def avatar_thumbnail(self) -> str: if self.avatar_id is not None: if self.avatar.thumbnail_path is not None: if self.avatar.thumbnail_path.startswith('app/'): return self.avatar.thumbnail_path.replace('app/', '/') else: return self.avatar.thumbnail_path else: return self.avatar_image() return '' @cache.memoize(timeout=500) def avatar_image(self) -> str: if self.avatar_id is not None: if self.avatar.file_path is not None: if self.avatar.file_path.startswith('app/'): return self.avatar.file_path.replace('app/', '/') else: return self.avatar.file_path if self.avatar.source_url is not None: if self.avatar.source_url.startswith('app/'): return self.avatar.source_url.replace('app/', '/') else: return self.avatar.source_url return '' @cache.memoize(timeout=500) def cover_image(self) -> str: if self.cover_id is not None: if self.cover.thumbnail_path is not None: if self.cover.thumbnail_path.startswith('app/'): return self.cover.thumbnail_path.replace('app/', '/') else: return self.cover.thumbnail_path if self.cover.source_url is not None: if self.cover.source_url.startswith('app/'): return self.cover.source_url.replace('app/', '/') else: return self.cover.source_url return '' def filesize(self): size = 0 if self.avatar_id: size += self.avatar.filesize() if self.cover_id: size += self.cover.filesize() return size def num_content(self): content = 0 content += db.session.execute(text('SELECT COUNT(id) as c FROM "post" WHERE user_id = :user_id'), {'user_id': self.id}).scalar() content += db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply" WHERE user_id = :user_id'), {'user_id': self.id}).scalar() return content def is_local(self): return self.ap_id is None or self.ap_profile_id.startswith('https://' + current_app.config['SERVER_NAME']) def waiting_for_approval(self): application = UserRegistration.query.filter_by(user_id=self.id, status=0).first() return application is not None @cache.memoize(timeout=30) def is_admin(self): for role in self.roles: if role.name == 'Admin': return True return False @cache.memoize(timeout=30) def is_staff(self): for role in self.roles: if role.name == 'Staff': return True return False def is_instance_admin(self): if self.instance_id: instance_role = InstanceRole.query.filter(InstanceRole.instance_id == self.instance_id, InstanceRole.user_id == self.id, InstanceRole.role == 'admin').first() return instance_role is not None else: return False def trustworthy(self): if self.is_admin(): return True if self.created_recently() or self.reputation < 100: return False return True def link(self) -> str: if self.is_local(): return self.user_name else: return self.ap_id def followers_url(self): if self.ap_followers_url: return self.ap_followers_url else: return self.public_url() + '/followers' def get_reset_password_token(self, expires_in=600): return jwt.encode( {'reset_password': self.id, 'exp': time() + expires_in}, current_app.config['SECRET_KEY'], algorithm='HS256') def another_account_using_email(self, email): another_account = User.query.filter(User.email == email, User.id != self.id).first() return another_account is not None def expires_soon(self): if self.expires is None: return False return self.expires < utcnow() + timedelta(weeks=1) def is_expired(self): if self.expires is None: return True return self.expires < utcnow() def expired_ages_ago(self): if self.expires is None: return True return self.expires < datetime(2019, 9, 1) def recalculate_attitude(self): upvotes = db.session.execute(text('SELECT COUNT(id) as c FROM "post_vote" WHERE user_id = :user_id AND effect > 0'), {'user_id': self.id}).scalar() downvotes = db.session.execute(text('SELECT COUNT(id) as c FROM "post_vote" WHERE user_id = :user_id AND effect < 0'), {'user_id': self.id}).scalar() if upvotes is None: upvotes = 0 if downvotes is None: downvotes = 0 comment_upvotes = db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply_vote" WHERE user_id = :user_id AND effect > 0'), {'user_id': self.id}).scalar() comment_downvotes = db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply_vote" WHERE user_id = :user_id AND effect < 0'), {'user_id': self.id}).scalar() if comment_upvotes is None: comment_upvotes = 0 if comment_downvotes is None: comment_downvotes = 0 total_upvotes = upvotes + comment_upvotes total_downvotes = downvotes + comment_downvotes if total_downvotes == 0: # guard against division by zero self.attitude = 1.0 else: if total_upvotes + total_downvotes > 2: # Only calculate attitude if they've done 3 or more votes as anything less than this could be an outlier and not representative of their overall attitude self.attitude = (total_upvotes - total_downvotes) / (total_upvotes + total_downvotes) else: self.attitude = 1.0 def subscribed(self, community_id: int) -> int: if community_id is None: return False subscription:CommunityMember = CommunityMember.query.filter_by(user_id=self.id, community_id=community_id).first() if subscription: if subscription.is_banned: return SUBSCRIPTION_BANNED elif subscription.is_owner: return SUBSCRIPTION_OWNER elif subscription.is_moderator: return SUBSCRIPTION_MODERATOR else: return SUBSCRIPTION_MEMBER else: join_request = CommunityJoinRequest.query.filter_by(user_id=self.id, community_id=community_id).first() if join_request: return SUBSCRIPTION_PENDING else: return SUBSCRIPTION_NONMEMBER def communities(self) -> List[Community]: return Community.query.filter(Community.banned == False).\ join(CommunityMember).filter(CommunityMember.is_banned == False, CommunityMember.user_id == self.id).all() def profile_id(self): result = self.ap_profile_id if self.ap_profile_id else f"https://{current_app.config['SERVER_NAME']}/u/{self.user_name.lower()}" return result def public_url(self): result = self.ap_public_url if self.ap_public_url else f"https://{current_app.config['SERVER_NAME']}/u/{self.user_name}" return result def created_recently(self): if self.is_admin(): return False return self.created and self.created > utcnow() - timedelta(days=7) def has_blocked_instance(self, instance_id: int): instance_block = InstanceBlock.query.filter_by(user_id=self.id, instance_id=instance_id).first() return instance_block is not None def has_blocked_user(self, user_id: int): existing_block = UserBlock.query.filter_by(blocker_id=self.id, blocked_id=user_id).first() return existing_block is not None @staticmethod def verify_reset_password_token(token): try: id = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])['reset_password'] except: return return User.query.get(id) def delete_dependencies(self): if self.cover_id: file = File.query.get(self.cover_id) file.delete_from_disk() self.cover_id = None db.session.delete(file) if self.avatar_id: file = File.query.get(self.avatar_id) file.delete_from_disk() self.avatar_id = None db.session.delete(file) if self.waiting_for_approval(): db.session.query(UserRegistration).filter(UserRegistration.user_id == self.id).delete() db.session.query(NotificationSubscription).filter(NotificationSubscription.user_id == self.id).delete() db.session.query(Notification).filter(Notification.user_id == self.id).delete() db.session.query(PollChoiceVote).filter(PollChoiceVote.user_id == self.id).delete() db.session.query(PostBookmark).filter(PostBookmark.user_id == self.id).delete() db.session.query(PostReplyBookmark).filter(PostReplyBookmark.user_id == self.id).delete() def purge_content(self, soft=True): files = File.query.join(Post).filter(Post.user_id == self.id).all() for file in files: file.delete_from_disk() self.delete_dependencies() posts = Post.query.filter_by(user_id=self.id).all() for post in posts: post.delete_dependencies() if soft: post.deleted = True else: db.session.delete(post) db.session.commit() post_replies = PostReply.query.filter_by(user_id=self.id).all() for reply in post_replies: reply.delete_dependencies() if soft: reply.deleted = True else: db.session.delete(reply) db.session.commit() def mention_tag(self): if self.ap_domain is None: return '@' + self.user_name + '@' + current_app.config['SERVER_NAME'] else: return '@' + self.user_name + '@' + self.ap_domain # True if user_id wants to be notified about posts by self def notify_new_posts(self, user_id): existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_USER).first() return existing_notification is not None # ids of all the users who want to be notified when self makes a post def notification_subscribers(self): return list(db.session.execute(text('SELECT user_id FROM "notification_subscription" WHERE entity_id = :user_id AND type = :type '), {'user_id': self.id, 'type': NOTIF_USER}).scalars()) class ActivityLog(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) activity_type = db.Column(db.String(64)) activity = db.Column(db.String(255)) timestamp = db.Column(db.DateTime, index=True, default=utcnow) class Post(db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) image_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), index=True) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) slug = db.Column(db.String(255)) title = db.Column(db.String(255)) url = db.Column(db.String(2048)) body = db.Column(db.Text) body_html = db.Column(db.Text) type = db.Column(db.Integer) comments_enabled = db.Column(db.Boolean, default=True) deleted = db.Column(db.Boolean, default=False, index=True) mea_culpa = db.Column(db.Boolean, default=False) has_embed = db.Column(db.Boolean, default=False) reply_count = db.Column(db.Integer, default=0) score = db.Column(db.Integer, default=0, index=True) # used for 'top' ranking nsfw = db.Column(db.Boolean, default=False, index=True) nsfl = db.Column(db.Boolean, default=False, index=True) sticky = db.Column(db.Boolean, default=False) notify_author = db.Column(db.Boolean, default=True) indexable = db.Column(db.Boolean, default=True) from_bot = db.Column(db.Boolean, default=False, index=True) created_at = db.Column(db.DateTime, index=True, default=utcnow) # this is when the content arrived here posted_at = db.Column(db.DateTime, index=True, default=utcnow) # this is when the original server created it last_active = db.Column(db.DateTime, index=True, default=utcnow) ip = db.Column(db.String(50)) up_votes = db.Column(db.Integer, default=0) down_votes = db.Column(db.Integer, default=0) ranking = db.Column(db.Integer, default=0, index=True) # used for 'hot' ranking edited_at = db.Column(db.DateTime) reports = db.Column(db.Integer, default=0) # how many times this post has been reported. Set to -1 to ignore reports language_id = db.Column(db.Integer, db.ForeignKey('language.id'), index=True) cross_posts = db.Column(MutableList.as_mutable(ARRAY(db.Integer))) tags = db.relationship('Tag', lazy='dynamic', secondary=post_tag, backref=db.backref('posts', lazy='dynamic')) ap_id = db.Column(db.String(255), index=True) ap_create_id = db.Column(db.String(100)) ap_announce_id = db.Column(db.String(100)) search_vector = db.Column(TSVectorType('title', 'body')) image = db.relationship(File, lazy='joined', foreign_keys=[image_id], cascade="all, delete") domain = db.relationship('Domain', lazy='joined', foreign_keys=[domain_id]) author = db.relationship('User', lazy='joined', overlaps='posts', foreign_keys=[user_id]) community = db.relationship('Community', lazy='joined', overlaps='posts', foreign_keys=[community_id]) replies = db.relationship('PostReply', lazy='dynamic', backref='post') language = db.relationship('Language', foreign_keys=[language_id]) def is_local(self): return self.ap_id is None or self.ap_id.startswith('https://' + current_app.config['SERVER_NAME']) @classmethod def get_by_ap_id(cls, ap_id): return cls.query.filter_by(ap_id=ap_id).first() def delete_dependencies(self): db.session.query(PostBookmark).filter(PostBookmark.post_id == self.id).delete() db.session.query(PollChoiceVote).filter(PollChoiceVote.post_id == self.id).delete() db.session.query(PollChoice).filter(PollChoice.post_id == self.id).delete() db.session.query(Poll).filter(Poll.post_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_id == self.id).delete() db.session.execute(text('DELETE FROM post_reply_vote WHERE post_reply_id IN (SELECT id FROM post_reply WHERE post_id = :post_id)'), {'post_id': self.id}) db.session.execute(text('DELETE FROM post_reply WHERE post_id = :post_id'), {'post_id': self.id}) db.session.execute(text('DELETE FROM post_vote WHERE post_id = :post_id'), {'post_id': self.id}) if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def youtube_embed(self): if self.url: vpos = self.url.find('v=') if vpos != -1: return self.url[vpos + 2:vpos + 13] def peertube_embed(self): if self.url: return self.url.replace('watch', 'embed') def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/post/{self.id}" def public_url(self): return self.profile_id() def blocked_by_content_filter(self, content_filters): lowercase_title = self.title.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_title: return name return False def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_POST).first() return existing_notification is not None def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def tags_for_activitypub(self): return_value = [] for tag in self.tags: return_value.append({'type': 'Hashtag', 'href': f'https://{current_app.config["SERVER_NAME"]}/tag/{tag.name}', 'name': f'#{tag.name}'}) return return_value class PostReply(db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), index=True) image_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) parent_id = db.Column(db.Integer, index=True) root_id = db.Column(db.Integer) depth = db.Column(db.Integer, default=0) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) body = db.Column(db.Text) body_html = db.Column(db.Text) body_html_safe = db.Column(db.Boolean, default=False) score = db.Column(db.Integer, default=0, index=True) # used for 'top' sorting nsfw = db.Column(db.Boolean, default=False) nsfl = db.Column(db.Boolean, default=False) notify_author = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, index=True, default=utcnow) posted_at = db.Column(db.DateTime, index=True, default=utcnow) deleted = db.Column(db.Boolean, default=False, index=True) ip = db.Column(db.String(50)) from_bot = db.Column(db.Boolean, default=False) up_votes = db.Column(db.Integer, default=0) down_votes = db.Column(db.Integer, default=0) ranking = db.Column(db.Float, default=0.0, index=True) # used for 'hot' sorting language_id = db.Column(db.Integer, db.ForeignKey('language.id'), index=True) edited_at = db.Column(db.DateTime) reports = db.Column(db.Integer, default=0) # how many times this post has been reported. Set to -1 to ignore reports ap_id = db.Column(db.String(255), index=True) ap_create_id = db.Column(db.String(100)) ap_announce_id = db.Column(db.String(100)) search_vector = db.Column(TSVectorType('body')) author = db.relationship('User', lazy='joined', foreign_keys=[user_id], single_parent=True, overlaps="post_replies") community = db.relationship('Community', lazy='joined', overlaps='replies', foreign_keys=[community_id]) language = db.relationship('Language', foreign_keys=[language_id]) def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def is_local(self): return self.ap_id is None or self.ap_id.startswith('https://' + current_app.config['SERVER_NAME']) @classmethod def get_by_ap_id(cls, ap_id): return cls.query.filter_by(ap_id=ap_id).first() def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/comment/{self.id}" def public_url(self): return self.profile_id() # the ap_id of the parent object, whether it's another PostReply or a Post def in_reply_to(self): if self.parent_id is None: return self.post.ap_id else: parent = PostReply.query.get(self.parent_id) return parent.ap_id # the AP profile of the person who wrote the parent object, which could be another PostReply or a Post def to(self): if self.parent_id is None: return self.post.author.public_url() else: parent = PostReply.query.get(self.parent_id) return parent.author.public_url() def delete_dependencies(self): for child_reply in self.child_replies(): child_reply.delete_dependencies() db.session.delete(child_reply) db.session.query(PostReplyBookmark).filter(PostReplyBookmark.post_reply_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_reply_id == self.id).delete() db.session.execute(text('DELETE FROM post_reply_vote WHERE post_reply_id = :post_reply_id'), {'post_reply_id': self.id}) if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def child_replies(self): return PostReply.query.filter_by(parent_id=self.id).all() def has_replies(self): reply = PostReply.query.filter_by(parent_id=self.id).filter(PostReply.deleted == False).first() return reply is not None def blocked_by_content_filter(self, content_filters): lowercase_body = self.body.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_body: return name return False def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_REPLY).first() return existing_notification is not None class Domain(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), index=True) post_count = db.Column(db.Integer, default=0) banned = db.Column(db.Boolean, default=False, index=True) # Domains can be banned site-wide (by admin) or DomainBlock'ed by users notify_mods = db.Column(db.Boolean, default=False, index=True) notify_admins = db.Column(db.Boolean, default=False, index=True) def blocked_by(self, user): block = DomainBlock.query.filter_by(domain_id=self.id, user_id=user.id).first() return block is not None def purge_content(self): files = File.query.join(Post).filter(Post.domain_id == self.id).all() for file in files: file.delete_from_disk() posts = Post.query.filter_by(domain_id=self.id).all() for post in posts: post.delete_dependencies() db.session.delete(post) db.session.commit() class DomainBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityMember(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) is_moderator = db.Column(db.Boolean, default=False) is_owner = db.Column(db.Boolean, default=False) is_banned = db.Column(db.Boolean, default=False, index=True) notify_new_posts = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) class CommunityWikiPage(db.Model): id = db.Column(db.Integer, primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) slug = db.Column(db.String(100), index=True) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) edited_at = db.Column(db.DateTime, default=utcnow) who_can_edit = db.Column(db.Integer, default=0) # 0 = mods & admins, 1 = trusted, 2 = community members, 3 = anyone revisions = db.relationship('CommunityWikiPageRevision', backref=db.backref('page'), cascade='all,delete', lazy='dynamic') def can_edit(self, user: User, community: Community): if user.is_anonymous: return False if self.who_can_edit == 0: if user.is_admin() or user.is_staff() or community.is_moderator(user): return True elif self.who_can_edit == 1: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy(): return True elif self.who_can_edit == 2: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy() or community.is_member(user): return True elif self.who_can_edit == 3: return True return False class CommunityWikiPageRevision(db.Model): id = db.Column(db.Integer, primary_key=True) wiki_page_id = db.Column(db.Integer, db.ForeignKey('community_wiki_page.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) edited_at = db.Column(db.DateTime, default=utcnow) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) class UserFollower(db.Model): local_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) remote_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) is_accepted = db.Column(db.Boolean, default=True) # flip to ban remote user / reject follow is_inward = db.Column(db.Boolean, default=True) # true = remote user is following a local one created_at = db.Column(db.DateTime, default=utcnow) # people banned from communities class CommunityBan(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) # person who is banned, not the banner community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) banned_by = db.Column(db.Integer, db.ForeignKey('user.id')) reason = db.Column(db.String(50)) created_at = db.Column(db.DateTime, default=utcnow) ban_until = db.Column(db.DateTime) class UserNote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) target_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) body = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class UserBlock(db.Model): id = db.Column(db.Integer, primary_key=True) blocker_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) blocked_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class Settings(db.Model): name = db.Column(db.String(50), primary_key=True) value = db.Column(db.String(1024)) class Interest(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) communities = db.Column(db.Text) class CommunityJoinRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) class UserFollowRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) follow_id = db.Column(db.Integer, db.ForeignKey('user.id')) class UserRegistration(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) answer = db.Column(db.String(512)) status = db.Column(db.Integer, default=0, index=True) # 0 = unapproved, 1 = approved created_at = db.Column(db.DateTime, default=utcnow) approved_at = db.Column(db.DateTime) approved_by = db.Column(db.Integer, db.ForeignKey('user.id')) user = db.relationship('User', foreign_keys=[user_id], lazy='joined') class PostVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) author_id = db.Column(db.Integer, db.ForeignKey('user.id')) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) effect = db.Column(db.Float, index=True) created_at = db.Column(db.DateTime, default=utcnow) post = db.relationship('Post', foreign_keys=[post_id]) class PostReplyVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who voted author_id = db.Column(db.Integer, db.ForeignKey('user.id')) # the author of the reply voted on - who's reputation is affected post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) effect = db.Column(db.Float) created_at = db.Column(db.DateTime, default=utcnow) # save every activity to a log, to aid debugging class ActivityPubLog(db.Model): id = db.Column(db.Integer, primary_key=True) direction = db.Column(db.String(3)) # 'in' or 'out' activity_id = db.Column(db.String(256), index=True) activity_type = db.Column(db.String(50)) # e.g. 'Follow', 'Accept', 'Like', etc activity_json = db.Column(db.Text) # the full json of the activity result = db.Column(db.String(10)) # 'success' or 'failure' exception_message = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class Filter(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) filter_home = db.Column(db.Boolean, default=True) filter_posts = db.Column(db.Boolean, default=True) filter_replies = db.Column(db.Boolean, default=False) hide_type = db.Column(db.Integer, default=0) # 0 = hide with warning, 1 = hide completely user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) expire_after = db.Column(db.Date) keywords = db.Column(db.String(500)) def keywords_string(self): if self.keywords is None or self.keywords == '': return '' split_keywords = [kw.strip() for kw in self.keywords.split('\n')] return ', '.join(split_keywords) class Role(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) weight = db.Column(db.Integer, default=0) permissions = db.relationship('RolePermission') class RolePermission(db.Model): role_id = db.Column(db.Integer, db.ForeignKey('role.id'), primary_key=True) permission = db.Column(db.String, primary_key=True, index=True) class Notification(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) url = db.Column(db.String(512)) read = db.Column(db.Boolean, default=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who the notification should go to author_id = db.Column(db.Integer, db.ForeignKey('user.id')) # the person who caused the notification to happen created_at = db.Column(db.DateTime, default=utcnow) class Report(db.Model): id = db.Column(db.Integer, primary_key=True) reasons = db.Column(db.String(256)) description = db.Column(db.String(256)) status = db.Column(db.Integer, default=0) # 0 = new, 1 = escalated to admin, 2 = being appealed, 3 = resolved, 4 = discarded type = db.Column(db.Integer, default=0) # 0 = user, 1 = post, 2 = reply, 3 = community, 4 = conversation reporter_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) suspect_user_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_post_id = db.Column(db.Integer, db.ForeignKey('post.id')) suspect_post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id')) suspect_conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id')) in_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) source_instance_id = db.Column(db.Integer, db.ForeignKey('instance.id')) # the instance of the reporter. mostly used to distinguish between local (instance 1) and remote reports created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) # textual representation of self.type def type_text(self): types = ('User', 'Post', 'Comment', 'Community', 'Conversation') if self.type is None: return '' else: return types[self.type] def is_local(self): return self.source_instance_id == 1 class NotificationSubscription(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) # to avoid needing to look up the thing subscribed to via entity_id type = db.Column(db.Integer, default=0, index=True) # see constants.py for possible values: NOTIF_* entity_id = db.Column(db.Integer, index=True) # ID of the user, post, community, etc being subscribed to user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # To whom this subscription belongs created_at = db.Column(db.DateTime, default=utcnow) # Perhaps very old subscriptions can be automatically deleted class Poll(db.Model): post_id = db.Column(db.Integer, db.ForeignKey('post.id'), primary_key=True) end_poll = db.Column(db.DateTime) mode = db.Column(db.String(10)) # 'single' or 'multiple' determines whether people can vote for one or multiple options local_only = db.Column(db.Boolean) latest_vote = db.Column(db.DateTime) def has_voted(self, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.post_id == self.post_id).first() return existing_vote is not None def vote_for_choice(self, choice_id, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.choice_id == choice_id).first() if not existing_vote: new_vote = PollChoiceVote(choice_id=choice_id, user_id=user_id, post_id=self.post_id) db.session.add(new_vote) choice = PollChoice.query.get(choice_id) choice.num_votes += 1 self.latest_vote = datetime.utcnow() db.session.commit() def total_votes(self): return db.session.execute(text('SELECT SUM(num_votes) as s FROM "poll_choice" WHERE post_id = :post_id'), {'post_id': self.post_id}).scalar() class PollChoice(db.Model): id = db.Column(db.Integer, primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) choice_text = db.Column(db.String(200)) sort_order = db.Column(db.Integer) num_votes = db.Column(db.Integer, default=0) def percentage(self, poll_total_votes): return math.ceil(self.num_votes / poll_total_votes * 100) class PollChoiceVote(db.Model): choice_id = db.Column(db.Integer, db.ForeignKey('poll_choice.id'), primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostReplyBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class ModLog(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) type = db.Column(db.String(10)) # 'mod' or 'admin' action = db.Column(db.String(30)) # 'removing post', 'banning from community', etc reason = db.Column(db.String(512)) link = db.Column(db.String(512)) link_text = db.Column(db.String(512)) public = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) community = db.relationship('Community', lazy='joined', foreign_keys=[community_id]) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) action_map = { 'add_mod': _l('Added moderator'), 'remove_mod': _l('Removed moderator'), 'featured_post': _l('Featured post'), 'unfeatured_post': _l('Unfeatured post'), 'delete_post': _l('Deleted post'), 'restore_post': _l('Un-deleted post'), 'delete_post_reply': _l('Deleted comment'), 'restore_post_reply': _l('Un-deleted comment'), 'delete_community': _l('Deleted community'), 'delete_user': _l('Deleted account'), 'undelete_user': _l('Restored account'), 'ban_user': _l('Banned account'), 'unban_user': _l('Un-banned account'), } def action_to_str(self): if self.action in self.action_map: return self.action_map[self.action] else: return self.action class IpBan(db.Model): id = db.Column(db.Integer, primary_key=True) ip_address = db.Column(db.String(50), index=True) notes = db.Column(db.String(150)) created_at = db.Column(db.DateTime, default=utcnow) class Site(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) description = db.Column(db.String(256)) icon_id = db.Column(db.Integer, db.ForeignKey('file.id')) sidebar = db.Column(db.Text, default='') legal_information = db.Column(db.Text, default='') public_key = db.Column(db.Text) private_key = db.Column(db.Text) enable_downvotes = db.Column(db.Boolean, default=True) allow_local_image_posts = db.Column(db.Boolean, default=True) remote_image_cache_days = db.Column(db.Integer, default=30) enable_nsfw = db.Column(db.Boolean, default=False) enable_nsfl = db.Column(db.Boolean, default=False) community_creation_admin_only = db.Column(db.Boolean, default=False) reports_email_admins = db.Column(db.Boolean, default=True) registration_mode = db.Column(db.String(20), default='Closed') # possible values: Open, RequireApplication, Closed application_question = db.Column(db.Text, default='') allow_or_block_list = db.Column(db.Integer, default=2) # 1 = allow list, 2 = block list allowlist = db.Column(db.Text, default='') blocklist = db.Column(db.Text, default='') blocked_phrases = db.Column(db.Text, default='') # discard incoming content with these phrases auto_decline_referrers = db.Column(db.Text, default='rdrama.net\nahrefs.com') # automatically decline registration requests if the referrer is one of these created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) last_active = db.Column(db.DateTime, default=utcnow) log_activitypub_json = db.Column(db.Boolean, default=False) default_theme = db.Column(db.String(20), default='') contact_email = db.Column(db.String(255), default='') about = db.Column(db.Text, default='') logo = db.Column(db.String(40), default='') logo_152 = db.Column(db.String(40), default='') logo_32 = db.Column(db.String(40), default='') logo_16 = db.Column(db.String(40), default='') show_inoculation_block = db.Column(db.Boolean, default=True) @staticmethod def admins() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_ADMIN).order_by(User.id).all() @staticmethod def staff() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_STAFF).order_by(User.id).all() #class IngressQueue(db.Model): # id = db.Column(db.Integer, primary_key=True) # waiting_for = db.Column(db.String(255), index=True) # The AP ID of the object we're waiting to be created before this Activity can be ingested # activity_pub_log_id = db.Column(db.Integer, db.ForeignKey('activity_pub_log.id')) # The original Activity that failed because some target object does not exist # ap_date_published = db.Column(db.DateTime, default=utcnow) # The value of the datePublished field on the Activity # created_at = db.Column(db.DateTime, default=utcnow) # expires = db.Column(db.DateTime, default=utcnow) # When to give up waiting and delete this row # # @login.user_loader def load_user(id): return User.query.get(int(id))