') or request_json['object']['content'].startswith('
')): request_json['object']['content'] = '' + request_json['object']['content'] + '
' post.body_html = allowlist_html(request_json['object']['content']) post.body = html_to_text(post.body_html) if microblog: autogenerated_title = microblog_content_to_title(post.body_html) if len(autogenerated_title) < 20: title = '[Microblog] ' + autogenerated_title.strip() else: title = autogenerated_title.strip() if '[NSFL]' in title.upper() or '(NSFL)' in title.upper(): post.nsfl = True if '[NSFW]' in title.upper() or '(NSFW)' in title.upper(): post.nsfw = True post.title = title # Discard post if it contains certain phrases. Good for stopping spam floods. blocked_phrases_list = blocked_phrases() for blocked_phrase in blocked_phrases_list: if blocked_phrase in post.title: return None if post.body: for blocked_phrase in blocked_phrases_list: if blocked_phrase in post.body: return None if 'attachment' in request_json['object'] and len(request_json['object']['attachment']) > 0 and \ 'type' in request_json['object']['attachment'][0]: alt_text = None if request_json['object']['attachment'][0]['type'] == 'Link': post.url = request_json['object']['attachment'][0]['href'] # Lemmy < 0.19.4 if request_json['object']['attachment'][0]['type'] == 'Document': post.url = request_json['object']['attachment'][0]['url'] # Mastodon if 'name' in request_json['object']['attachment'][0]: alt_text = request_json['object']['attachment'][0]['name'] if request_json['object']['attachment'][0]['type'] == 'Image': post.url = request_json['object']['attachment'][0]['url'] # PixelFed, PieFed, Lemmy >= 0.19.4 if 'name' in request_json['object']['attachment'][0]: alt_text = request_json['object']['attachment'][0]['name'] if post.url: if is_image_url(post.url): post.type = constants.POST_TYPE_IMAGE if 'image' in request_json['object'] and 'url' in request_json['object']['image']: image = File(source_url=request_json['object']['image']['url']) else: image = File(source_url=post.url) if alt_text: image.alt_text = alt_text db.session.add(image) post.image = image elif is_video_url(post.url): # youtube is detected later post.type = constants.POST_TYPE_VIDEO image = File(source_url=post.url) db.session.add(image) post.image = image else: post.type = constants.POST_TYPE_LINK domain = domain_from_url(post.url) # notify about links to banned websites. already_notified = set() # often admins and mods are the same people - avoid notifying them twice if domain.notify_mods: for community_member in post.community.moderators(): notify = Notification(title='Suspicious content', url=post.ap_id, user_id=community_member.user_id, author_id=user.id) db.session.add(notify) already_notified.add(community_member.user_id) if domain.notify_admins: for admin in Site.admins(): if admin.id not in already_notified: notify = Notification(title='Suspicious content', url=post.ap_id, user_id=admin.id, author_id=user.id) db.session.add(notify) if domain.banned or domain.name.endswith('.pages.dev'): raise Exception(domain.name + ' is blocked by admin') else: domain.post_count += 1 post.domain = domain if post is not None: if request_json['object']['type'] == 'Video': post.type = constants.POST_TYPE_VIDEO post.url = request_json['object']['id'] if 'icon' in request_json['object'] and isinstance(request_json['object']['icon'], list): icon = File(source_url=request_json['object']['icon'][-1]['url']) db.session.add(icon) post.image = icon # Language. Lemmy uses 'language' while Mastodon has 'contentMap' if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict): language = find_language_or_create(request_json['object']['language']['identifier'], request_json['object']['language']['name']) post.language_id = language.id elif 'contentMap' in request_json['object'] and isinstance(request_json['object']['contentMap'], dict): language = find_language(next(iter(request_json['object']['contentMap']))) post.language_id = language.id if language else None if 'tag' in request_json['object'] and isinstance(request_json['object']['tag'], list): for json_tag in request_json['object']['tag']: if json_tag and json_tag['type'] == 'Hashtag': if json_tag['name'][1:].lower() != community.name.lower(): # Lemmy adds the community slug as a hashtag on every post in the community, which we want to ignore hashtag = find_hashtag_or_create(json_tag['name']) if hashtag: post.tags.append(hashtag) if 'image' in request_json['object'] and post.image is None: image = File(source_url=request_json['object']['image']['url']) db.session.add(image) post.image = image if post.image is None and post.type == constants.POST_TYPE_LINK: # This is a link post but the source instance has not provided a thumbnail image # Let's see if we can do better than the source instance did! tn_url = post.url if tn_url[:32] == 'https://www.youtube.com/watch?v=': tn_url = 'https://youtu.be/' + tn_url[ 32:43] # better chance of thumbnail from youtu.be than youtube.com opengraph = opengraph_parse(tn_url) if opengraph and (opengraph.get('og:image', '') != '' or opengraph.get('og:image:url', '') != ''): filename = opengraph.get('og:image') or opengraph.get('og:image:url') if not filename.startswith('/'): file = File(source_url=filename, alt_text=shorten_string(opengraph.get('og:title'), 295)) post.image = file db.session.add(file) if 'searchableBy' in request_json['object'] and request_json['object']['searchableBy'] != 'https://www.w3.org/ns/activitystreams#Public': post.indexable = False if post.url: post.url = remove_tracking_from_link(post.url) # moved here as changes youtu.be to youtube.com if is_video_hosting_site(post.url): post.type = constants.POST_TYPE_VIDEO db.session.add(post) post.ranking = post.post_ranking(post.score, post.posted_at) community.post_count += 1 community.last_active = utcnow() user.post_count += 1 db.session.commit() # Polls need to be processed quite late because they need a post_id to refer to if request_json['object']['type'] == 'Question': post.type = constants.POST_TYPE_POLL mode = 'single' if 'anyOf' in request_json['object']: mode = 'multiple' poll = Poll(post_id=post.id, end_poll=request_json['object']['endTime'], mode=mode, local_only=False) db.session.add(poll) i = 1 for choice_ap in request_json['object']['oneOf' if mode == 'single' else 'anyOf']: new_choice = PollChoice(post_id=post.id, choice_text=choice_ap['name'], sort_order=i) db.session.add(new_choice) i += 1 db.session.commit() if post.image_id: make_image_sizes(post.image_id, 170, 512, 'posts', community.low_quality) # the 512 sized image is for masonry view # Update list of cross posts if post.url: other_posts = Post.query.filter(Post.id != post.id, Post.url == post.url, Post.deleted == False, Post.posted_at > post.posted_at - timedelta(days=6)).all() for op in other_posts: if op.cross_posts is None: op.cross_posts = [post.id] else: op.cross_posts.append(post.id) if post.cross_posts is None: post.cross_posts = [op.id] else: post.cross_posts.append(op.id) db.session.commit() if post.community_id not in communities_banned_from(user.id): notify_about_post(post) if user.reputation > 100: post.up_votes += 1 post.score += 1 post.ranking = post.post_ranking(post.score, post.posted_at) db.session.commit() return post # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 epoch = datetime(1970, 1, 1) @classmethod def epoch_seconds(self, date): td = date - self.epoch return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000) def delete_dependencies(self): db.session.query(PostBookmark).filter(PostBookmark.post_id == self.id).delete() db.session.query(PollChoiceVote).filter(PollChoiceVote.post_id == self.id).delete() db.session.query(PollChoice).filter(PollChoice.post_id == self.id).delete() db.session.query(Poll).filter(Poll.post_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_id == self.id).delete() db.session.execute(text('DELETE FROM "post_reply_vote" WHERE post_reply_id IN (SELECT id FROM post_reply WHERE post_id = :post_id)'), {'post_id': self.id}) db.session.execute(text('DELETE FROM "post_reply" WHERE post_id = :post_id'), {'post_id': self.id}) db.session.execute(text('DELETE FROM "post_vote" WHERE post_id = :post_id'), {'post_id': self.id}) if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def youtube_embed(self, rel=True) -> str: if self.url: parsed_url = urlparse(self.url) query_params = parse_qs(parsed_url.query) if 'v' in query_params: video_id = query_params.pop('v')[0] if rel: query_params['rel'] = '0' new_query = urlencode(query_params, doseq=True) return f'{video_id}?{new_query}' if '/shorts/' in parsed_url.path: video_id = parsed_url.path.split('/shorts/')[1].split('/')[0] if 't' in query_params: query_params['start'] = query_params.pop('t')[0] if rel: query_params['rel'] = '0' new_query = urlencode(query_params, doseq=True) return f'{video_id}?{new_query}' return '' def youtube_video_id(self) -> str: if self.url: parsed_url = urlparse(self.url) query_params = parse_qs(parsed_url.query) if 'v' in query_params: return query_params['v'][0] if '/shorts/' in parsed_url.path: video_id = parsed_url.path.split('/shorts/')[1].split('/')[0] return f'{video_id}' return '' def peertube_embed(self): if self.url: return self.url.replace('watch', 'embed') def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/post/{self.id}" def public_url(self): return self.profile_id() def blocked_by_content_filter(self, content_filters): lowercase_title = self.title.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_title: return name return False def posted_at_localized(self, sort, locale): # some locales do not have a definition for 'weeks' so are unable to display some dates in some languages. Fall back to english for those languages. try: return arrow.get(self.last_active if sort == 'active' else self.posted_at).humanize(locale=locale) except ValueError as v: return arrow.get(self.last_active if sort == 'active' else self.posted_at).humanize(locale='en') def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_POST).first() return existing_notification is not None def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def tags_for_activitypub(self): return_value = [] for tag in self.tags: return_value.append({'type': 'Hashtag', 'href': f'https://{current_app.config["SERVER_NAME"]}/tag/{tag.name}', 'name': f'#{tag.name}'}) return return_value def post_reply_count_recalculate(self): self.post_reply_count = db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply" WHERE post_id = :post_id AND deleted is false'), {'post_id': self.id}).scalar() # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 epoch = datetime(1970, 1, 1) def epoch_seconds(self, date): td = date - self.epoch return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000) # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 def post_ranking(self, score, date: datetime): if date is None: date = datetime.utcnow() if score is None: score = 1 order = math.log(max(abs(score), 1), 10) sign = 1 if score > 0 else -1 if score < 0 else 0 seconds = self.epoch_seconds(date) - 1685766018 return round(sign * order + seconds / 45000, 7) def vote(self, user: User, vote_direction: str): existing_vote = PostVote.query.filter_by(user_id=user.id, post_id=self.id).first() if existing_vote and vote_direction == 'reversal': # api sends '1' for upvote, '-1' for downvote, and '0' for reversal if existing_vote.effect == 1: vote_direction = 'upvote' elif existing_vote.effect == -1: vote_direction = 'downvote' assert vote_direction == 'upvote' or vote_direction == 'downvote' undo = None if existing_vote: if not self.community.low_quality: self.author.reputation -= existing_vote.effect if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) self.up_votes -= 1 self.score -= existing_vote.effect # score - (+1) = score-1 undo = 'Like' else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 self.up_votes -= 1 self.down_votes += 1 self.score += existing_vote.effect * 2 # score + (-2) = score-2 else: # previous vote was down if vote_direction == 'downvote': # new vote is also down, so remove it db.session.delete(existing_vote) self.down_votes -= 1 self.score -= existing_vote.effect # score - (-1) = score+1 undo = 'Dislike' else: # new vote is up while previous vote was down, so reverse their previous vote existing_vote.effect = 1 self.up_votes += 1 self.down_votes -= 1 self.score += existing_vote.effect * 2 # score + (+2) = score+2 db.session.commit() else: if vote_direction == 'upvote': effect = Instance.weight(user.ap_domain) spicy_effect = effect # Make 'hot' sort more spicy by amplifying the effect of early upvotes if self.up_votes + self.down_votes <= 10: spicy_effect = effect * current_app.config['SPICY_UNDER_10'] elif self.up_votes + self.down_votes <= 30: spicy_effect = effect * current_app.config['SPICY_UNDER_30'] elif self.up_votes + self.down_votes <= 60: spicy_effect = effect * current_app.config['SPICY_UNDER_60'] if user.cannot_vote(): effect = spicy_effect = 0 self.up_votes += 1 self.score += spicy_effect # score + (+1) = score+1 else: effect = -1.0 spicy_effect = effect self.down_votes += 1 # Make 'hot' sort more spicy by amplifying the effect of early downvotes if self.up_votes + self.down_votes <= 30: spicy_effect *= current_app.config['SPICY_UNDER_30'] elif self.up_votes + self.down_votes <= 60: spicy_effect *= current_app.config['SPICY_UNDER_60'] if user.cannot_vote(): effect = spicy_effect = 0 self.score += spicy_effect # score + (-1) = score-1 vote = PostVote(user_id=user.id, post_id=self.id, author_id=self.author.id, effect=effect) # upvotes do not increase reputation in low quality communities if self.community.low_quality and effect > 0: effect = 0 self.author.reputation += effect db.session.add(vote) user.last_seen = utcnow() db.session.commit() if not user.banned: self.ranking = self.post_ranking(self.score, self.created_at) user.recalculate_attitude() db.session.commit() return undo class PostReply(db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), index=True) image_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) parent_id = db.Column(db.Integer, index=True) root_id = db.Column(db.Integer) depth = db.Column(db.Integer, default=0) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) body = db.Column(db.Text) body_html = db.Column(db.Text) body_html_safe = db.Column(db.Boolean, default=False) score = db.Column(db.Integer, default=0, index=True) # used for 'top' sorting nsfw = db.Column(db.Boolean, default=False) nsfl = db.Column(db.Boolean, default=False) notify_author = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, index=True, default=utcnow) posted_at = db.Column(db.DateTime, index=True, default=utcnow) deleted = db.Column(db.Boolean, default=False, index=True) deleted_by = db.Column(db.Integer, index=True) ip = db.Column(db.String(50)) from_bot = db.Column(db.Boolean, default=False) up_votes = db.Column(db.Integer, default=0) down_votes = db.Column(db.Integer, default=0) ranking = db.Column(db.Float, default=0.0, index=True) # used for 'hot' sorting language_id = db.Column(db.Integer, db.ForeignKey('language.id'), index=True) edited_at = db.Column(db.DateTime) reports = db.Column(db.Integer, default=0) # how many times this post has been reported. Set to -1 to ignore reports ap_id = db.Column(db.String(255), index=True) ap_create_id = db.Column(db.String(100)) ap_announce_id = db.Column(db.String(100)) search_vector = db.Column(TSVectorType('body')) author = db.relationship('User', lazy='joined', foreign_keys=[user_id], single_parent=True, overlaps="post_replies") community = db.relationship('Community', lazy='joined', overlaps='replies', foreign_keys=[community_id]) language = db.relationship('Language', foreign_keys=[language_id]) @classmethod def new(cls, user: User, post: Post, in_reply_to, body, body_html, notify_author, language_id, request_json: dict = None, announce_id=None): from app.utils import shorten_string, blocked_phrases, recently_upvoted_post_replies, reply_already_exists, reply_is_just_link_to_gif_reaction, reply_is_stupid from app.activitypub.util import notify_about_post_reply if not post.comments_enabled: raise Exception('Comments are disabled on this post') if in_reply_to is not None: parent_id = in_reply_to.id depth = in_reply_to.depth + 1 else: parent_id = None depth = 0 reply = PostReply(user_id=user.id, post_id=post.id, parent_id=parent_id, depth=depth, community_id=post.community.id, body=body, body_html=body_html, body_html_safe=True, from_bot=user.bot, nsfw=post.nsfw, nsfl=post.nsfl, notify_author=notify_author, instance_id=user.instance_id, language_id=language_id, ap_id=request_json['object']['id'] if request_json else None, ap_create_id=request_json['id'] if request_json else None, ap_announce_id=announce_id) if reply.body: for blocked_phrase in blocked_phrases(): if blocked_phrase in reply.body: raise Exception('Blocked phrase in comment') if in_reply_to is None or in_reply_to.parent_id is None: notification_target = post else: notification_target = PostReply.query.get(in_reply_to.parent_id) if notification_target.author.has_blocked_user(reply.user_id): raise Exception('Replier blocked') if reply_already_exists(user_id=user.id, post_id=post.id, parent_id=reply.parent_id, body=reply.body): raise Exception('Duplicate reply') if reply_is_just_link_to_gif_reaction(reply.body): user.reputation -= 1 raise Exception('Gif comment ignored') if reply_is_stupid(reply.body): raise Exception('Low quality reply') db.session.add(reply) db.session.commit() # Notify subscribers notify_about_post_reply(in_reply_to, reply) # Subscribe to own comment if notify_author: new_notification = NotificationSubscription(name=shorten_string(_('Replies to my comment on %(post_title)s', post_title=post.title), 50), user_id=user.id, entity_id=reply.id, type=NOTIF_REPLY) db.session.add(new_notification) # upvote own reply reply.score = 1 reply.up_votes = 1 reply.ranking = PostReply.confidence(1, 0) vote = PostReplyVote(user_id=user.id, post_reply_id=reply.id, author_id=user.id, effect=1) db.session.add(vote) if user.is_local(): cache.delete_memoized(recently_upvoted_post_replies, user.id) reply.ap_id = reply.profile_id() if user.reputation > 100: reply.up_votes += 1 reply.score += 1 reply.ranking += 1 elif user.reputation < -100: reply.score -= 1 reply.ranking -= 1 if not user.bot: post.reply_count += 1 post.community.post_reply_count += 1 post.community.last_active = post.last_active = utcnow() user.post_reply_count += 1 db.session.commit() return reply def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def is_local(self): return self.ap_id is None or self.ap_id.startswith('https://' + current_app.config['SERVER_NAME']) @classmethod def get_by_ap_id(cls, ap_id): return cls.query.filter_by(ap_id=ap_id).first() def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/comment/{self.id}" def public_url(self): return self.profile_id() def posted_at_localized(self, locale): try: return arrow.get(self.posted_at).humanize(locale=locale) except ValueError as v: return arrow.get(self.posted_at).humanize(locale='en') # the ap_id of the parent object, whether it's another PostReply or a Post def in_reply_to(self): if self.parent_id is None: return self.post.ap_id else: parent = PostReply.query.get(self.parent_id) return parent.ap_id # the AP profile of the person who wrote the parent object, which could be another PostReply or a Post def to(self): if self.parent_id is None: return self.post.author.public_url() else: parent = PostReply.query.get(self.parent_id) return parent.author.public_url() def delete_dependencies(self): for child_reply in self.child_replies(): child_reply.delete_dependencies() db.session.delete(child_reply) db.session.query(PostReplyBookmark).filter(PostReplyBookmark.post_reply_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_reply_id == self.id).delete() db.session.execute(text('DELETE FROM post_reply_vote WHERE post_reply_id = :post_reply_id'), {'post_reply_id': self.id}) if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def child_replies(self): return PostReply.query.filter_by(parent_id=self.id).all() def has_replies(self): reply = PostReply.query.filter_by(parent_id=self.id).filter(PostReply.deleted == False).first() return reply is not None def blocked_by_content_filter(self, content_filters): lowercase_body = self.body.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_body: return name return False def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_REPLY).first() return existing_notification is not None # used for ranking comments @classmethod def _confidence(cls, ups, downs): n = ups + downs if n == 0: return 0.0 z = 1.281551565545 p = float(ups) / n left = p + 1 / (2 * n) * z * z right = z * math.sqrt(p * (1 - p) / n + z * z / (4 * n * n)) under = 1 + 1 / n * z * z return (left - right) / under @classmethod def confidence(cls, ups, downs) -> float: if ups is None or ups < 0: ups = 0 if downs is None or downs < 0: downs = 0 if ups + downs == 0: return 0.0 else: return cls._confidence(ups, downs) def vote(self, user: User, vote_direction: str): existing_vote = PostReplyVote.query.filter_by(user_id=user.id, post_reply_id=self.id).first() if existing_vote and vote_direction == 'reversal': # api sends '1' for upvote, '-1' for downvote, and '0' for reversal if existing_vote.effect == 1: vote_direction = 'upvote' elif existing_vote.effect == -1: vote_direction = 'downvote' assert vote_direction == 'upvote' or vote_direction == 'downvote' undo = None if existing_vote: if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) self.up_votes -= 1 self.score -= 1 undo = 'Like' else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 self.up_votes -= 1 self.down_votes += 1 self.score -= 2 else: # previous vote was down if vote_direction == 'downvote': # new vote is also down, so remove it db.session.delete(existing_vote) self.down_votes -= 1 self.score += 1 undo = 'Dislike' else: # new vote is up while previous vote was down, so reverse their previous vote existing_vote.effect = 1 self.up_votes += 1 self.down_votes -= 1 self.score += 2 else: if user.cannot_vote(): effect = 0 else: effect = 1 if vote_direction == 'upvote': self.up_votes += 1 else: effect = effect * -1 self.down_votes += 1 self.score += effect vote = PostReplyVote(user_id=user.id, post_reply_id=self.id, author_id=self.author.id, effect=effect) self.author.reputation += effect db.session.add(vote) user.last_seen = utcnow() self.ranking = PostReply.confidence(self.up_votes, self.down_votes) user.recalculate_attitude() db.session.commit() return undo class Domain(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), index=True) post_count = db.Column(db.Integer, default=0) banned = db.Column(db.Boolean, default=False, index=True) # Domains can be banned site-wide (by admin) or DomainBlock'ed by users notify_mods = db.Column(db.Boolean, default=False, index=True) notify_admins = db.Column(db.Boolean, default=False, index=True) def blocked_by(self, user): block = DomainBlock.query.filter_by(domain_id=self.id, user_id=user.id).first() return block is not None def purge_content(self): files = File.query.join(Post).filter(Post.domain_id == self.id).all() for file in files: file.delete_from_disk() posts = Post.query.filter_by(domain_id=self.id).all() for post in posts: post.delete_dependencies() db.session.delete(post) db.session.commit() class DomainBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityMember(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) is_moderator = db.Column(db.Boolean, default=False) is_owner = db.Column(db.Boolean, default=False) is_banned = db.Column(db.Boolean, default=False, index=True) notify_new_posts = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) class CommunityWikiPage(db.Model): id = db.Column(db.Integer, primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) slug = db.Column(db.String(100), index=True) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) edited_at = db.Column(db.DateTime, default=utcnow) who_can_edit = db.Column(db.Integer, default=0) # 0 = mods & admins, 1 = trusted, 2 = community members, 3 = anyone revisions = db.relationship('CommunityWikiPageRevision', backref=db.backref('page'), cascade='all,delete', lazy='dynamic') def can_edit(self, user: User, community: Community): if user.is_anonymous: return False if self.who_can_edit == 0: if user.is_admin() or user.is_staff() or community.is_moderator(user): return True elif self.who_can_edit == 1: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy(): return True elif self.who_can_edit == 2: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy() or community.is_member(user): return True elif self.who_can_edit == 3: return True return False class CommunityWikiPageRevision(db.Model): id = db.Column(db.Integer, primary_key=True) wiki_page_id = db.Column(db.Integer, db.ForeignKey('community_wiki_page.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) edited_at = db.Column(db.DateTime, default=utcnow) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) class UserFollower(db.Model): local_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) remote_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) is_accepted = db.Column(db.Boolean, default=True) # flip to ban remote user / reject follow is_inward = db.Column(db.Boolean, default=True) # true = remote user is following a local one created_at = db.Column(db.DateTime, default=utcnow) # people banned from communities class CommunityBan(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) # person who is banned, not the banner community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) banned_by = db.Column(db.Integer, db.ForeignKey('user.id')) reason = db.Column(db.String(50)) created_at = db.Column(db.DateTime, default=utcnow) ban_until = db.Column(db.DateTime) class UserNote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) target_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) body = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class UserBlock(db.Model): id = db.Column(db.Integer, primary_key=True) blocker_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) blocked_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class Settings(db.Model): name = db.Column(db.String(50), primary_key=True) value = db.Column(db.String(1024)) class Interest(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) communities = db.Column(db.Text) class CommunityJoinRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) class UserFollowRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) follow_id = db.Column(db.Integer, db.ForeignKey('user.id')) class UserRegistration(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) answer = db.Column(db.String(512)) status = db.Column(db.Integer, default=0, index=True) # 0 = unapproved, 1 = approved created_at = db.Column(db.DateTime, default=utcnow) approved_at = db.Column(db.DateTime) approved_by = db.Column(db.Integer, db.ForeignKey('user.id')) user = db.relationship('User', foreign_keys=[user_id], lazy='joined') class PostVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) author_id = db.Column(db.Integer, db.ForeignKey('user.id')) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) effect = db.Column(db.Float, index=True) created_at = db.Column(db.DateTime, default=utcnow) post = db.relationship('Post', foreign_keys=[post_id]) class PostReplyVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who voted author_id = db.Column(db.Integer, db.ForeignKey('user.id')) # the author of the reply voted on - who's reputation is affected post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) effect = db.Column(db.Float) created_at = db.Column(db.DateTime, default=utcnow) # save every activity to a log, to aid debugging class ActivityPubLog(db.Model): id = db.Column(db.Integer, primary_key=True) direction = db.Column(db.String(3)) # 'in' or 'out' activity_id = db.Column(db.String(256), index=True) activity_type = db.Column(db.String(50)) # e.g. 'Follow', 'Accept', 'Like', etc activity_json = db.Column(db.Text) # the full json of the activity result = db.Column(db.String(10)) # 'success' or 'failure' exception_message = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class Filter(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) filter_home = db.Column(db.Boolean, default=True) filter_posts = db.Column(db.Boolean, default=True) filter_replies = db.Column(db.Boolean, default=False) hide_type = db.Column(db.Integer, default=0) # 0 = hide with warning, 1 = hide completely user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) expire_after = db.Column(db.Date) keywords = db.Column(db.String(500)) def keywords_string(self): if self.keywords is None or self.keywords == '': return '' split_keywords = [kw.strip() for kw in self.keywords.split('\n')] return ', '.join(split_keywords) class Role(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) weight = db.Column(db.Integer, default=0) permissions = db.relationship('RolePermission') class RolePermission(db.Model): role_id = db.Column(db.Integer, db.ForeignKey('role.id'), primary_key=True) permission = db.Column(db.String, primary_key=True, index=True) class Notification(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) url = db.Column(db.String(512)) read = db.Column(db.Boolean, default=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who the notification should go to author_id = db.Column(db.Integer, db.ForeignKey('user.id')) # the person who caused the notification to happen created_at = db.Column(db.DateTime, default=utcnow) class Report(db.Model): id = db.Column(db.Integer, primary_key=True) reasons = db.Column(db.String(256)) description = db.Column(db.String(256)) status = db.Column(db.Integer, default=0) # 0 = new, 1 = escalated to admin, 2 = being appealed, 3 = resolved, 4 = discarded type = db.Column(db.Integer, default=0) # 0 = user, 1 = post, 2 = reply, 3 = community, 4 = conversation reporter_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) suspect_user_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_post_id = db.Column(db.Integer, db.ForeignKey('post.id')) suspect_post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id')) suspect_conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id')) in_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) source_instance_id = db.Column(db.Integer, db.ForeignKey('instance.id')) # the instance of the reporter. mostly used to distinguish between local (instance 1) and remote reports created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) # textual representation of self.type def type_text(self): types = ('User', 'Post', 'Comment', 'Community', 'Conversation') if self.type is None: return '' else: return types[self.type] def is_local(self): return self.source_instance_id == 1 class NotificationSubscription(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) # to avoid needing to look up the thing subscribed to via entity_id type = db.Column(db.Integer, default=0, index=True) # see constants.py for possible values: NOTIF_* entity_id = db.Column(db.Integer, index=True) # ID of the user, post, community, etc being subscribed to user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # To whom this subscription belongs created_at = db.Column(db.DateTime, default=utcnow) # Perhaps very old subscriptions can be automatically deleted class Poll(db.Model): post_id = db.Column(db.Integer, db.ForeignKey('post.id'), primary_key=True) end_poll = db.Column(db.DateTime) mode = db.Column(db.String(10)) # 'single' or 'multiple' determines whether people can vote for one or multiple options local_only = db.Column(db.Boolean) latest_vote = db.Column(db.DateTime) def has_voted(self, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.post_id == self.post_id).first() return existing_vote is not None def vote_for_choice(self, choice_id, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.choice_id == choice_id).first() if not existing_vote: new_vote = PollChoiceVote(choice_id=choice_id, user_id=user_id, post_id=self.post_id) db.session.add(new_vote) choice = PollChoice.query.get(choice_id) choice.num_votes += 1 self.latest_vote = datetime.utcnow() db.session.commit() def total_votes(self): return db.session.execute(text('SELECT SUM(num_votes) as s FROM "poll_choice" WHERE post_id = :post_id'), {'post_id': self.post_id}).scalar() class PollChoice(db.Model): id = db.Column(db.Integer, primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) choice_text = db.Column(db.String(200)) sort_order = db.Column(db.Integer) num_votes = db.Column(db.Integer, default=0) def percentage(self, poll_total_votes): return math.ceil(self.num_votes / poll_total_votes * 100) class PollChoiceVote(db.Model): choice_id = db.Column(db.Integer, db.ForeignKey('poll_choice.id'), primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostReplyBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class ModLog(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) type = db.Column(db.String(10)) # 'mod' or 'admin' action = db.Column(db.String(30)) # 'removing post', 'banning from community', etc reason = db.Column(db.String(512)) link = db.Column(db.String(512)) link_text = db.Column(db.String(512)) public = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) community = db.relationship('Community', lazy='joined', foreign_keys=[community_id]) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) action_map = { 'add_mod': _l('Added moderator'), 'remove_mod': _l('Removed moderator'), 'featured_post': _l('Featured post'), 'unfeatured_post': _l('Unfeatured post'), 'delete_post': _l('Deleted post'), 'restore_post': _l('Un-deleted post'), 'delete_post_reply': _l('Deleted comment'), 'restore_post_reply': _l('Un-deleted comment'), 'delete_community': _l('Deleted community'), 'delete_user': _l('Deleted account'), 'undelete_user': _l('Restored account'), 'ban_user': _l('Banned account'), 'unban_user': _l('Un-banned account'), } def action_to_str(self): if self.action in self.action_map: return self.action_map[self.action] else: return self.action class IpBan(db.Model): id = db.Column(db.Integer, primary_key=True) ip_address = db.Column(db.String(50), index=True) notes = db.Column(db.String(150)) created_at = db.Column(db.DateTime, default=utcnow) class Site(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) description = db.Column(db.String(256)) icon_id = db.Column(db.Integer, db.ForeignKey('file.id')) sidebar = db.Column(db.Text, default='') legal_information = db.Column(db.Text, default='') public_key = db.Column(db.Text) private_key = db.Column(db.Text) enable_downvotes = db.Column(db.Boolean, default=True) allow_local_image_posts = db.Column(db.Boolean, default=True) remote_image_cache_days = db.Column(db.Integer, default=30) enable_nsfw = db.Column(db.Boolean, default=False) enable_nsfl = db.Column(db.Boolean, default=False) community_creation_admin_only = db.Column(db.Boolean, default=False) reports_email_admins = db.Column(db.Boolean, default=True) registration_mode = db.Column(db.String(20), default='Closed') # possible values: Open, RequireApplication, Closed application_question = db.Column(db.Text, default='') allow_or_block_list = db.Column(db.Integer, default=2) # 1 = allow list, 2 = block list allowlist = db.Column(db.Text, default='') blocklist = db.Column(db.Text, default='') blocked_phrases = db.Column(db.Text, default='') # discard incoming content with these phrases auto_decline_referrers = db.Column(db.Text, default='rdrama.net\nahrefs.com') # automatically decline registration requests if the referrer is one of these created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) last_active = db.Column(db.DateTime, default=utcnow) log_activitypub_json = db.Column(db.Boolean, default=False) default_theme = db.Column(db.String(20), default='') contact_email = db.Column(db.String(255), default='') about = db.Column(db.Text, default='') logo = db.Column(db.String(40), default='') logo_152 = db.Column(db.String(40), default='') logo_32 = db.Column(db.String(40), default='') logo_16 = db.Column(db.String(40), default='') show_inoculation_block = db.Column(db.Boolean, default=True) @staticmethod def admins() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_ADMIN).order_by(User.id).all() @staticmethod def staff() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_STAFF).order_by(User.id).all() #class IngressQueue(db.Model): # id = db.Column(db.Integer, primary_key=True) # waiting_for = db.Column(db.String(255), index=True) # The AP ID of the object we're waiting to be created before this Activity can be ingested # activity_pub_log_id = db.Column(db.Integer, db.ForeignKey('activity_pub_log.id')) # The original Activity that failed because some target object does not exist # ap_date_published = db.Column(db.DateTime, default=utcnow) # The value of the datePublished field on the Activity # created_at = db.Column(db.DateTime, default=utcnow) # expires = db.Column(db.DateTime, default=utcnow) # When to give up waiting and delete this row # # @login.user_loader def load_user(id): return User.query.get(int(id))