') or request_json['object']['content'].startswith('
')): request_json['object']['content'] = '' + request_json['object']['content'] + '
' post.body_html = allowlist_html(request_json['object']['content']) post.body = html_to_text(post.body_html) if microblog: autogenerated_title = microblog_content_to_title(post.body_html) if len(autogenerated_title) < 20: title = '[Microblog] ' + autogenerated_title.strip() else: title = autogenerated_title.strip() if '[NSFL]' in title.upper() or '(NSFL)' in title.upper(): post.nsfl = True if '[NSFW]' in title.upper() or '(NSFW)' in title.upper(): post.nsfw = True post.title = title # Discard post if it contains certain phrases. Good for stopping spam floods. blocked_phrases_list = blocked_phrases() for blocked_phrase in blocked_phrases_list: if blocked_phrase in post.title: return None if post.body: for blocked_phrase in blocked_phrases_list: if blocked_phrase in post.body: return None if ('attachment' in request_json['object'] and isinstance(request_json['object']['attachment'], list) and len(request_json['object']['attachment']) > 0 and 'type' in request_json['object']['attachment'][0]): alt_text = None if request_json['object']['attachment'][0]['type'] == 'Link': post.url = request_json['object']['attachment'][0]['href'] # Lemmy < 0.19.4 if request_json['object']['attachment'][0]['type'] == 'Document': post.url = request_json['object']['attachment'][0]['url'] # Mastodon if 'name' in request_json['object']['attachment'][0]: alt_text = request_json['object']['attachment'][0]['name'] if request_json['object']['attachment'][0]['type'] == 'Image': post.url = request_json['object']['attachment'][0]['url'] # PixelFed, PieFed, Lemmy >= 0.19.4 if 'name' in request_json['object']['attachment'][0]: alt_text = request_json['object']['attachment'][0]['name'] if 'attachment' in request_json['object'] and isinstance(request_json['object']['attachment'], dict): # a.gup.pe (Mastodon) alt_text = None post.url = request_json['object']['attachment']['url'] if post.url: if is_image_url(post.url): post.type = constants.POST_TYPE_IMAGE image = File(source_url=post.url) if alt_text: image.alt_text = alt_text db.session.add(image) post.image = image elif is_video_url(post.url): # youtube is detected later post.type = constants.POST_TYPE_VIDEO image = File(source_url=post.url) db.session.add(image) post.image = image else: post.type = constants.POST_TYPE_LINK domain = domain_from_url(post.url) # notify about links to banned websites. already_notified = set() # often admins and mods are the same people - avoid notifying them twice if domain.notify_mods: for community_member in post.community.moderators(): notify = Notification(title='Suspicious content', url=post.ap_id, user_id=community_member.user_id, author_id=user.id) db.session.add(notify) already_notified.add(community_member.user_id) if domain.notify_admins: for admin in Site.admins(): if admin.id not in already_notified: notify = Notification(title='Suspicious content', url=post.ap_id, user_id=admin.id, author_id=user.id) db.session.add(notify) if domain.banned or domain.name.endswith('.pages.dev'): raise Exception(domain.name + ' is blocked by admin') else: domain.post_count += 1 post.domain = domain if post is not None: if request_json['object']['type'] == 'Video': post.type = constants.POST_TYPE_VIDEO post.url = request_json['object']['id'] if 'icon' in request_json['object'] and isinstance(request_json['object']['icon'], list): icon = File(source_url=request_json['object']['icon'][-1]['url']) db.session.add(icon) post.image = icon # Language. Lemmy uses 'language' while Mastodon has 'contentMap' if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict): language = find_language_or_create(request_json['object']['language']['identifier'], request_json['object']['language']['name']) post.language = language elif 'contentMap' in request_json['object'] and isinstance(request_json['object']['contentMap'], dict): language = find_language(next(iter(request_json['object']['contentMap']))) post.language_id = language.id if language else None if 'licence' in request_json['object'] and isinstance(request_json['object']['licence'], dict): licence = find_licence_or_create(request_json['object']['licence']['name']) post.licence = licence if 'tag' in request_json['object'] and isinstance(request_json['object']['tag'], list): for json_tag in request_json['object']['tag']: if json_tag and json_tag['type'] == 'Hashtag': if json_tag['name'][1:].lower() != community.name.lower(): # Lemmy adds the community slug as a hashtag on every post in the community, which we want to ignore hashtag = find_hashtag_or_create(json_tag['name']) if hashtag: post.tags.append(hashtag) if 'image' in request_json['object'] and post.image is None: image = File(source_url=request_json['object']['image']['url']) db.session.add(image) post.image = image if post.image is None and post.type == constants.POST_TYPE_LINK: # This is a link post but the source instance has not provided a thumbnail image # Let's see if we can do better than the source instance did! tn_url = post.url if tn_url[:32] == 'https://www.youtube.com/watch?v=': tn_url = 'https://youtu.be/' + tn_url[ 32:43] # better chance of thumbnail from youtu.be than youtube.com opengraph = opengraph_parse(tn_url) if opengraph and (opengraph.get('og:image', '') != '' or opengraph.get('og:image:url', '') != ''): filename = opengraph.get('og:image') or opengraph.get('og:image:url') if not filename.startswith('/'): file = File(source_url=filename, alt_text=shorten_string(opengraph.get('og:title'), 295)) post.image = file db.session.add(file) if 'searchableBy' in request_json['object'] and request_json['object']['searchableBy'] != 'https://www.w3.org/ns/activitystreams#Public': post.indexable = False if post.url: post.url = remove_tracking_from_link(post.url) # moved here as changes youtu.be to youtube.com if is_video_hosting_site(post.url): post.type = constants.POST_TYPE_VIDEO db.session.add(post) post.ranking = post.post_ranking(post.score, post.posted_at) community.post_count += 1 community.last_active = utcnow() user.post_count += 1 try: db.session.commit() except IntegrityError: db.session.rollback() return Post.query.filter_by(ap_id=request_json['object']['id'].lower()).one() # Polls need to be processed quite late because they need a post_id to refer to if request_json['object']['type'] == 'Question': post.type = constants.POST_TYPE_POLL mode = 'single' if 'anyOf' in request_json['object']: mode = 'multiple' poll = Poll(post_id=post.id, end_poll=request_json['object']['endTime'], mode=mode, local_only=False) db.session.add(poll) i = 1 for choice_ap in request_json['object']['oneOf' if mode == 'single' else 'anyOf']: new_choice = PollChoice(post_id=post.id, choice_text=choice_ap['name'], sort_order=i) db.session.add(new_choice) i += 1 db.session.commit() if post.image_id: make_image_sizes(post.image_id, 170, 512, 'posts', community.low_quality) # the 512 sized image is for masonry view # Update list of cross posts if post.url: other_posts = Post.query.filter(Post.id != post.id, Post.url == post.url, Post.deleted == False, Post.posted_at > post.posted_at - timedelta(days=6)).all() for op in other_posts: if op.cross_posts is None: op.cross_posts = [post.id] else: op.cross_posts.append(post.id) if post.cross_posts is None: post.cross_posts = [op.id] else: post.cross_posts.append(op.id) db.session.commit() if post.community_id not in communities_banned_from(user.id): notify_about_post(post) if user.reputation > 100: post.up_votes += 1 post.score += 1 post.ranking = post.post_ranking(post.score, post.posted_at) db.session.commit() return post # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 epoch = datetime(1970, 1, 1) @classmethod def epoch_seconds(self, date): td = date - self.epoch return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000) def delete_dependencies(self): db.session.query(PostBookmark).filter(PostBookmark.post_id == self.id).delete() db.session.query(PollChoiceVote).filter(PollChoiceVote.post_id == self.id).delete() db.session.query(PollChoice).filter(PollChoice.post_id == self.id).delete() db.session.query(Poll).filter(Poll.post_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_id == self.id).delete() db.session.execute(text('DELETE FROM "post_vote" WHERE post_id = :post_id'), {'post_id': self.id}) reply_ids = db.session.execute(text('SELECT id FROM "post_reply" WHERE post_id = :post_id'), {'post_id': self.id}).scalars() reply_ids = tuple(reply_ids) if reply_ids: db.session.execute(text('DELETE FROM "post_reply_vote" WHERE post_reply_id IN :reply_ids'), {'reply_ids': reply_ids}) db.session.execute(text('DELETE FROM "post_reply_bookmark" WHERE post_reply_id IN :reply_ids'), {'reply_ids': reply_ids}) db.session.execute(text('DELETE FROM "report" WHERE suspect_post_reply_id IN :reply_ids'), {'reply_ids': reply_ids}) db.session.execute(text('DELETE FROM "post_reply" WHERE post_id = :post_id'), {'post_id': self.id}) self.community.post_reply_count = db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply" WHERE community_id = :community_id AND deleted = false'), {'community_id': self.community_id}).scalar() if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def youtube_embed(self, rel=True) -> str: if self.url: parsed_url = urlparse(self.url) query_params = parse_qs(parsed_url.query) if 'v' in query_params: video_id = query_params.pop('v')[0] if rel: query_params['rel'] = '0' new_query = urlencode(query_params, doseq=True) return f'{video_id}?{new_query}' if '/shorts/' in parsed_url.path: video_id = parsed_url.path.split('/shorts/')[1].split('/')[0] if 't' in query_params: query_params['start'] = query_params.pop('t')[0] if rel: query_params['rel'] = '0' new_query = urlencode(query_params, doseq=True) return f'{video_id}?{new_query}' return '' def youtube_video_id(self) -> str: if self.url: parsed_url = urlparse(self.url) query_params = parse_qs(parsed_url.query) if 'v' in query_params: return query_params['v'][0] if '/shorts/' in parsed_url.path: video_id = parsed_url.path.split('/shorts/')[1].split('/')[0] return f'{video_id}' return '' def peertube_embed(self): if self.url: return self.url.replace('watch', 'embed') def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/post/{self.id}" def public_url(self): return self.profile_id() def blocked_by_content_filter(self, content_filters): lowercase_title = self.title.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_title: return name return False def posted_at_localized(self, sort, locale): # some locales do not have a definition for 'weeks' so are unable to display some dates in some languages. Fall back to english for those languages. try: return arrow.get(self.last_active if sort == 'active' else self.posted_at).humanize(locale=locale) except ValueError as v: return arrow.get(self.last_active if sort == 'active' else self.posted_at).humanize(locale='en') def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_POST).first() return existing_notification is not None def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def tags_for_activitypub(self): return_value = [] for tag in self.tags: return_value.append({'type': 'Hashtag', 'href': f'https://{current_app.config["SERVER_NAME"]}/tag/{tag.name}', 'name': f'#{tag.name}'}) return return_value def post_reply_count_recalculate(self): self.post_reply_count = db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply" WHERE post_id = :post_id AND deleted is false'), {'post_id': self.id}).scalar() # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 epoch = datetime(1970, 1, 1) def epoch_seconds(self, date): td = date - self.epoch return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000) # All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9 def post_ranking(self, score, date: datetime): if date is None: date = datetime.utcnow() if score is None: score = 1 order = math.log(max(abs(score), 1), 10) sign = 1 if score > 0 else -1 if score < 0 else 0 seconds = self.epoch_seconds(date) - 1685766018 return round(sign * order + seconds / 45000, 7) def vote(self, user: User, vote_direction: str): existing_vote = PostVote.query.filter_by(user_id=user.id, post_id=self.id).first() if existing_vote and vote_direction == 'reversal': # api sends '1' for upvote, '-1' for downvote, and '0' for reversal if existing_vote.effect == 1: vote_direction = 'upvote' elif existing_vote.effect == -1: vote_direction = 'downvote' assert vote_direction == 'upvote' or vote_direction == 'downvote' undo = None if existing_vote: if not self.community.low_quality: self.author.reputation -= existing_vote.effect if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) self.up_votes -= 1 self.score -= existing_vote.effect # score - (+1) = score-1 undo = 'Like' else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 self.up_votes -= 1 self.down_votes += 1 self.score += existing_vote.effect * 2 # score + (-2) = score-2 else: # previous vote was down if vote_direction == 'downvote': # new vote is also down, so remove it db.session.delete(existing_vote) self.down_votes -= 1 self.score -= existing_vote.effect # score - (-1) = score+1 undo = 'Dislike' else: # new vote is up while previous vote was down, so reverse their previous vote existing_vote.effect = 1 self.up_votes += 1 self.down_votes -= 1 self.score += existing_vote.effect * 2 # score + (+2) = score+2 db.session.commit() else: if vote_direction == 'upvote': effect = Instance.weight(user.ap_domain) spicy_effect = effect # Make 'hot' sort more spicy by amplifying the effect of early upvotes if self.up_votes + self.down_votes <= 10: spicy_effect = effect * current_app.config['SPICY_UNDER_10'] elif self.up_votes + self.down_votes <= 30: spicy_effect = effect * current_app.config['SPICY_UNDER_30'] elif self.up_votes + self.down_votes <= 60: spicy_effect = effect * current_app.config['SPICY_UNDER_60'] if user.cannot_vote(): effect = spicy_effect = 0 self.up_votes += 1 self.score += spicy_effect # score + (+1) = score+1 else: effect = -1.0 spicy_effect = effect self.down_votes += 1 # Make 'hot' sort more spicy by amplifying the effect of early downvotes if self.up_votes + self.down_votes <= 30: spicy_effect *= current_app.config['SPICY_UNDER_30'] elif self.up_votes + self.down_votes <= 60: spicy_effect *= current_app.config['SPICY_UNDER_60'] if user.cannot_vote(): effect = spicy_effect = 0 self.score += spicy_effect # score + (-1) = score-1 vote = PostVote(user_id=user.id, post_id=self.id, author_id=self.author.id, effect=effect) # upvotes do not increase reputation in low quality communities if self.community.low_quality and effect > 0: effect = 0 self.author.reputation += effect db.session.add(vote) user.last_seen = utcnow() db.session.commit() if not user.banned: self.ranking = self.post_ranking(self.score, self.created_at) user.recalculate_attitude() db.session.commit() return undo class PostReply(db.Model): query_class = FullTextSearchQuery id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), index=True) image_id = db.Column(db.Integer, db.ForeignKey('file.id'), index=True) parent_id = db.Column(db.Integer, index=True) root_id = db.Column(db.Integer) depth = db.Column(db.Integer, default=0) instance_id = db.Column(db.Integer, db.ForeignKey('instance.id'), index=True) body = db.Column(db.Text) body_html = db.Column(db.Text) body_html_safe = db.Column(db.Boolean, default=False) score = db.Column(db.Integer, default=0, index=True) # used for 'top' sorting nsfw = db.Column(db.Boolean, default=False) nsfl = db.Column(db.Boolean, default=False) notify_author = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, index=True, default=utcnow) posted_at = db.Column(db.DateTime, index=True, default=utcnow) deleted = db.Column(db.Boolean, default=False, index=True) deleted_by = db.Column(db.Integer, index=True) ip = db.Column(db.String(50)) from_bot = db.Column(db.Boolean, default=False) up_votes = db.Column(db.Integer, default=0) down_votes = db.Column(db.Integer, default=0) ranking = db.Column(db.Float, default=0.0, index=True) # used for 'hot' sorting language_id = db.Column(db.Integer, db.ForeignKey('language.id'), index=True) edited_at = db.Column(db.DateTime) reports = db.Column(db.Integer, default=0) # how many times this post has been reported. Set to -1 to ignore reports ap_id = db.Column(db.String(255), index=True, unique=True) ap_create_id = db.Column(db.String(100)) ap_announce_id = db.Column(db.String(100)) search_vector = db.Column(TSVectorType('body')) author = db.relationship('User', lazy='joined', foreign_keys=[user_id], single_parent=True, overlaps="post_replies") community = db.relationship('Community', lazy='joined', overlaps='replies', foreign_keys=[community_id]) language = db.relationship('Language', foreign_keys=[language_id]) @classmethod def new(cls, user: User, post: Post, in_reply_to, body, body_html, notify_author, language_id, request_json: dict = None, announce_id=None): from app.utils import shorten_string, blocked_phrases, recently_upvoted_post_replies, reply_already_exists, reply_is_just_link_to_gif_reaction, reply_is_stupid from app.activitypub.util import notify_about_post_reply if not post.comments_enabled: raise Exception('Comments are disabled on this post') if in_reply_to is not None: parent_id = in_reply_to.id depth = in_reply_to.depth + 1 else: parent_id = None depth = 0 reply = PostReply(user_id=user.id, post_id=post.id, parent_id=parent_id, depth=depth, community_id=post.community.id, body=body, body_html=body_html, body_html_safe=True, from_bot=user.bot, nsfw=post.nsfw, nsfl=post.nsfl, notify_author=notify_author, instance_id=user.instance_id, language_id=language_id, ap_id=request_json['object']['id'].lower() if request_json else None, ap_create_id=request_json['id'] if request_json else None, ap_announce_id=announce_id) if reply.body: for blocked_phrase in blocked_phrases(): if blocked_phrase in reply.body: raise Exception('Blocked phrase in comment') if in_reply_to is None or in_reply_to.parent_id is None: notification_target = post else: notification_target = PostReply.query.get(in_reply_to.parent_id) if notification_target.author.has_blocked_user(reply.user_id): raise Exception('Replier blocked') if reply_already_exists(user_id=user.id, post_id=post.id, parent_id=reply.parent_id, body=reply.body): raise Exception('Duplicate reply') if reply_is_just_link_to_gif_reaction(reply.body): user.reputation -= 1 raise Exception('Gif comment ignored') if reply_is_stupid(reply.body): raise Exception('Low quality reply') try: db.session.add(reply) db.session.commit() except IntegrityError: db.session.rollback() return PostReply.query.filter_by(ap_id=request_json['object']['id'].lower()).one() # Notify subscribers notify_about_post_reply(in_reply_to, reply) # Subscribe to own comment if notify_author: new_notification = NotificationSubscription(name=shorten_string(_('Replies to my comment on %(post_title)s', post_title=post.title), 50), user_id=user.id, entity_id=reply.id, type=NOTIF_REPLY) db.session.add(new_notification) # upvote own reply reply.score = 1 reply.up_votes = 1 reply.ranking = PostReply.confidence(1, 0) vote = PostReplyVote(user_id=user.id, post_reply_id=reply.id, author_id=user.id, effect=1) db.session.add(vote) if user.is_local(): cache.delete_memoized(recently_upvoted_post_replies, user.id) reply.ap_id = reply.profile_id() if user.reputation > 100: reply.up_votes += 1 reply.score += 1 reply.ranking += 1 elif user.reputation < -100: reply.score -= 1 reply.ranking -= 1 if not user.bot: post.reply_count += 1 post.community.post_reply_count += 1 post.community.last_active = post.last_active = utcnow() user.post_reply_count += 1 db.session.commit() return reply def language_code(self): if self.language_id: return self.language.code else: return 'en' def language_name(self): if self.language_id: return self.language.name else: return 'English' def is_local(self): return self.ap_id is None or self.ap_id.startswith('https://' + current_app.config['SERVER_NAME']) @classmethod def get_by_ap_id(cls, ap_id): return cls.query.filter_by(ap_id=ap_id.lower()).first() def profile_id(self): if self.ap_id: return self.ap_id else: return f"https://{current_app.config['SERVER_NAME']}/comment/{self.id}" def public_url(self): return self.profile_id() def posted_at_localized(self, locale): try: return arrow.get(self.posted_at).humanize(locale=locale) except ValueError as v: return arrow.get(self.posted_at).humanize(locale='en') # the ap_id of the parent object, whether it's another PostReply or a Post def in_reply_to(self): if self.parent_id is None: return self.post.ap_id else: parent = PostReply.query.get(self.parent_id) return parent.ap_id # the AP profile of the person who wrote the parent object, which could be another PostReply or a Post def to(self): if self.parent_id is None: return self.post.author.public_url() else: parent = PostReply.query.get(self.parent_id) return parent.author.public_url() def delete_dependencies(self): """ The first loop doesn't seem to ever be invoked with the current behaviour. For replies with their own replies: functions which deal with removal don't set reply.deleted and don't call this, and because reply.deleted isn't set, the cli task 7 days later doesn't call this either. The plan is to set reply.deleted whether there's child replies or not (as happens with the API call), so I've commented it out so the current behaviour isn't changed. for child_reply in self.child_replies(): child_reply.delete_dependencies() db.session.delete(child_reply) """ db.session.query(PostReplyBookmark).filter(PostReplyBookmark.post_reply_id == self.id).delete() db.session.query(Report).filter(Report.suspect_post_reply_id == self.id).delete() db.session.execute(text('DELETE FROM post_reply_vote WHERE post_reply_id = :post_reply_id'), {'post_reply_id': self.id}) if self.image_id: file = File.query.get(self.image_id) file.delete_from_disk() def child_replies(self): return PostReply.query.filter_by(parent_id=self.id).all() def has_replies(self): reply = PostReply.query.filter_by(parent_id=self.id).filter(PostReply.deleted == False).first() return reply is not None def blocked_by_content_filter(self, content_filters): lowercase_body = self.body.lower() for name, keywords in content_filters.items() if content_filters else {}: for keyword in keywords: if keyword in lowercase_body: return name return False def notify_new_replies(self, user_id: int) -> bool: existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == self.id, NotificationSubscription.user_id == user_id, NotificationSubscription.type == NOTIF_REPLY).first() return existing_notification is not None # used for ranking comments @classmethod def _confidence(cls, ups, downs): n = ups + downs if n == 0: return 0.0 z = 1.281551565545 p = float(ups) / n left = p + 1 / (2 * n) * z * z right = z * math.sqrt(p * (1 - p) / n + z * z / (4 * n * n)) under = 1 + 1 / n * z * z return (left - right) / under @classmethod def confidence(cls, ups, downs) -> float: if ups is None or ups < 0: ups = 0 if downs is None or downs < 0: downs = 0 if ups + downs == 0: return 0.0 else: return cls._confidence(ups, downs) def vote(self, user: User, vote_direction: str): existing_vote = PostReplyVote.query.filter_by(user_id=user.id, post_reply_id=self.id).first() if existing_vote and vote_direction == 'reversal': # api sends '1' for upvote, '-1' for downvote, and '0' for reversal if existing_vote.effect == 1: vote_direction = 'upvote' elif existing_vote.effect == -1: vote_direction = 'downvote' assert vote_direction == 'upvote' or vote_direction == 'downvote' undo = None if existing_vote: if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) self.up_votes -= 1 self.score -= 1 undo = 'Like' else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 self.up_votes -= 1 self.down_votes += 1 self.score -= 2 else: # previous vote was down if vote_direction == 'downvote': # new vote is also down, so remove it db.session.delete(existing_vote) self.down_votes -= 1 self.score += 1 undo = 'Dislike' else: # new vote is up while previous vote was down, so reverse their previous vote existing_vote.effect = 1 self.up_votes += 1 self.down_votes -= 1 self.score += 2 else: if user.cannot_vote(): effect = 0 else: effect = 1 if vote_direction == 'upvote': self.up_votes += 1 else: effect = effect * -1 self.down_votes += 1 self.score += effect vote = PostReplyVote(user_id=user.id, post_reply_id=self.id, author_id=self.author.id, effect=effect) self.author.reputation += effect db.session.add(vote) db.session.commit() user.last_seen = utcnow() self.ranking = PostReply.confidence(self.up_votes, self.down_votes) user.recalculate_attitude() db.session.commit() return undo class Domain(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), index=True) post_count = db.Column(db.Integer, default=0) banned = db.Column(db.Boolean, default=False, index=True) # Domains can be banned site-wide (by admin) or DomainBlock'ed by users notify_mods = db.Column(db.Boolean, default=False, index=True) notify_admins = db.Column(db.Boolean, default=False, index=True) def blocked_by(self, user): block = DomainBlock.query.filter_by(domain_id=self.id, user_id=user.id).first() return block is not None def purge_content(self): files = File.query.join(Post).filter(Post.domain_id == self.id).all() for file in files: file.delete_from_disk() posts = Post.query.filter_by(domain_id=self.id).all() for post in posts: post.delete_dependencies() db.session.delete(post) db.session.commit() class DomainBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) domain_id = db.Column(db.Integer, db.ForeignKey('domain.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityBlock(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) created_at = db.Column(db.DateTime, default=utcnow) class CommunityMember(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) is_moderator = db.Column(db.Boolean, default=False) is_owner = db.Column(db.Boolean, default=False) is_banned = db.Column(db.Boolean, default=False, index=True) notify_new_posts = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) class CommunityWikiPage(db.Model): id = db.Column(db.Integer, primary_key=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) slug = db.Column(db.String(100), index=True) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) edited_at = db.Column(db.DateTime, default=utcnow) who_can_edit = db.Column(db.Integer, default=0) # 0 = mods & admins, 1 = trusted, 2 = community members, 3 = anyone revisions = db.relationship('CommunityWikiPageRevision', backref=db.backref('page'), cascade='all,delete', lazy='dynamic') def can_edit(self, user: User, community: Community): if user.is_anonymous: return False if self.who_can_edit == 0: if user.is_admin() or user.is_staff() or community.is_moderator(user): return True elif self.who_can_edit == 1: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy(): return True elif self.who_can_edit == 2: if user.is_admin() or user.is_staff() or community.is_moderator(user) or user.trustworthy() or community.is_member(user): return True elif self.who_can_edit == 3: return True return False class CommunityWikiPageRevision(db.Model): id = db.Column(db.Integer, primary_key=True) wiki_page_id = db.Column(db.Integer, db.ForeignKey('community_wiki_page.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) title = db.Column(db.String(255)) body = db.Column(db.Text) body_html = db.Column(db.Text) edited_at = db.Column(db.DateTime, default=utcnow) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) class UserFollower(db.Model): local_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) remote_user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) is_accepted = db.Column(db.Boolean, default=True) # flip to ban remote user / reject follow is_inward = db.Column(db.Boolean, default=True) # true = remote user is following a local one created_at = db.Column(db.DateTime, default=utcnow) # people banned from communities class CommunityBan(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) # person who is banned, not the banner community_id = db.Column(db.Integer, db.ForeignKey('community.id'), primary_key=True) banned_by = db.Column(db.Integer, db.ForeignKey('user.id')) banned_until = db.Column(db.DateTime) reason = db.Column(db.String(256)) created_at = db.Column(db.DateTime, default=utcnow) ban_until = db.Column(db.DateTime) class UserNote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) target_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) body = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class UserBlock(db.Model): id = db.Column(db.Integer, primary_key=True) blocker_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) blocked_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class Settings(db.Model): name = db.Column(db.String(50), primary_key=True) value = db.Column(db.String(1024)) class Interest(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) communities = db.Column(db.Text) class CommunityJoinRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) class UserFollowRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) follow_id = db.Column(db.Integer, db.ForeignKey('user.id')) class UserRegistration(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) answer = db.Column(db.String(512)) status = db.Column(db.Integer, default=0, index=True) # 0 = unapproved, 1 = approved created_at = db.Column(db.DateTime, default=utcnow) approved_at = db.Column(db.DateTime) approved_by = db.Column(db.Integer, db.ForeignKey('user.id')) user = db.relationship('User', foreign_keys=[user_id], lazy='joined') class PostVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) author_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) effect = db.Column(db.Float, index=True) created_at = db.Column(db.DateTime, default=utcnow) post = db.relationship('Post', foreign_keys=[post_id]) class PostReplyVote(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who voted author_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # the author of the reply voted on - who's reputation is affected post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) effect = db.Column(db.Float) created_at = db.Column(db.DateTime, default=utcnow) # save every activity to a log, to aid debugging class ActivityPubLog(db.Model): id = db.Column(db.Integer, primary_key=True) direction = db.Column(db.String(3)) # 'in' or 'out' activity_id = db.Column(db.String(256), index=True) activity_type = db.Column(db.String(50)) # e.g. 'Follow', 'Accept', 'Like', etc activity_json = db.Column(db.Text) # the full json of the activity result = db.Column(db.String(10)) # 'success' or 'failure' exception_message = db.Column(db.Text) created_at = db.Column(db.DateTime, default=utcnow) class Filter(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) filter_home = db.Column(db.Boolean, default=True) filter_posts = db.Column(db.Boolean, default=True) filter_replies = db.Column(db.Boolean, default=False) hide_type = db.Column(db.Integer, default=0) # 0 = hide with warning, 1 = hide completely user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) expire_after = db.Column(db.Date) keywords = db.Column(db.String(500)) def keywords_string(self): if self.keywords is None or self.keywords == '': return '' split_keywords = [kw.strip() for kw in self.keywords.split('\n')] return ', '.join(split_keywords) class Role(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) weight = db.Column(db.Integer, default=0) permissions = db.relationship('RolePermission') class RolePermission(db.Model): role_id = db.Column(db.Integer, db.ForeignKey('role.id'), primary_key=True) permission = db.Column(db.String, primary_key=True, index=True) class Notification(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50)) url = db.Column(db.String(512)) read = db.Column(db.Boolean, default=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # who the notification should go to author_id = db.Column(db.Integer, db.ForeignKey('user.id')) # the person who caused the notification to happen created_at = db.Column(db.DateTime, default=utcnow) class Report(db.Model): id = db.Column(db.Integer, primary_key=True) reasons = db.Column(db.String(256)) description = db.Column(db.String(256)) status = db.Column(db.Integer, default=0) # 0 = new, 1 = escalated to admin, 2 = being appealed, 3 = resolved, 4 = discarded type = db.Column(db.Integer, default=0) # 0 = user, 1 = post, 2 = reply, 3 = community, 4 = conversation reporter_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) suspect_user_id = db.Column(db.Integer, db.ForeignKey('user.id')) suspect_post_id = db.Column(db.Integer, db.ForeignKey('post.id')) suspect_post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id')) suspect_conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id')) in_community_id = db.Column(db.Integer, db.ForeignKey('community.id')) source_instance_id = db.Column(db.Integer, db.ForeignKey('instance.id')) # the instance of the reporter. mostly used to distinguish between local (instance 1) and remote reports created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) # textual representation of self.type def type_text(self): types = ('User', 'Post', 'Comment', 'Community', 'Conversation') if self.type is None: return '' else: return types[self.type] def is_local(self): return self.source_instance_id == 1 class NotificationSubscription(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) # to avoid needing to look up the thing subscribed to via entity_id type = db.Column(db.Integer, default=0, index=True) # see constants.py for possible values: NOTIF_* entity_id = db.Column(db.Integer, index=True) # ID of the user, post, community, etc being subscribed to user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) # To whom this subscription belongs created_at = db.Column(db.DateTime, default=utcnow) # Perhaps very old subscriptions can be automatically deleted class Poll(db.Model): post_id = db.Column(db.Integer, db.ForeignKey('post.id'), primary_key=True) end_poll = db.Column(db.DateTime) mode = db.Column(db.String(10)) # 'single' or 'multiple' determines whether people can vote for one or multiple options local_only = db.Column(db.Boolean) latest_vote = db.Column(db.DateTime) def has_voted(self, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.post_id == self.post_id).first() return existing_vote is not None def vote_for_choice(self, choice_id, user_id): existing_vote = PollChoiceVote.query.filter(PollChoiceVote.user_id == user_id, PollChoiceVote.choice_id == choice_id).first() if not existing_vote: new_vote = PollChoiceVote(choice_id=choice_id, user_id=user_id, post_id=self.post_id) db.session.add(new_vote) choice = PollChoice.query.get(choice_id) choice.num_votes += 1 self.latest_vote = datetime.utcnow() db.session.commit() def total_votes(self): return db.session.execute(text('SELECT SUM(num_votes) as s FROM "poll_choice" WHERE post_id = :post_id'), {'post_id': self.post_id}).scalar() class PollChoice(db.Model): id = db.Column(db.Integer, primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) choice_text = db.Column(db.String(200)) sort_order = db.Column(db.Integer) num_votes = db.Column(db.Integer, default=0) def percentage(self, poll_total_votes): return math.ceil(self.num_votes / poll_total_votes * 100) class PollChoiceVote(db.Model): choice_id = db.Column(db.Integer, db.ForeignKey('poll_choice.id'), primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class PostReplyBookmark(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) post_reply_id = db.Column(db.Integer, db.ForeignKey('post_reply.id'), index=True) created_at = db.Column(db.DateTime, default=utcnow) class ModLog(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), index=True) community_id = db.Column(db.Integer, db.ForeignKey('community.id'), index=True) type = db.Column(db.String(10)) # 'mod' or 'admin' action = db.Column(db.String(30)) # 'removing post', 'banning from community', etc reason = db.Column(db.String(512)) link = db.Column(db.String(512)) link_text = db.Column(db.String(512)) public = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=utcnow) community = db.relationship('Community', lazy='joined', foreign_keys=[community_id]) author = db.relationship('User', lazy='joined', foreign_keys=[user_id]) action_map = { 'add_mod': _l('Added moderator'), 'remove_mod': _l('Removed moderator'), 'featured_post': _l('Featured post'), 'unfeatured_post': _l('Unfeatured post'), 'delete_post': _l('Deleted post'), 'restore_post': _l('Un-deleted post'), 'delete_post_reply': _l('Deleted comment'), 'restore_post_reply': _l('Un-deleted comment'), 'delete_community': _l('Deleted community'), 'delete_user': _l('Deleted account'), 'undelete_user': _l('Restored account'), 'ban_user': _l('Banned account'), 'unban_user': _l('Un-banned account'), } def action_to_str(self): if self.action in self.action_map: return self.action_map[self.action] else: return self.action class IpBan(db.Model): id = db.Column(db.Integer, primary_key=True) ip_address = db.Column(db.String(50), index=True) notes = db.Column(db.String(150)) created_at = db.Column(db.DateTime, default=utcnow) class Site(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(256)) description = db.Column(db.String(256)) icon_id = db.Column(db.Integer, db.ForeignKey('file.id')) sidebar = db.Column(db.Text, default='') legal_information = db.Column(db.Text, default='') public_key = db.Column(db.Text) private_key = db.Column(db.Text) enable_downvotes = db.Column(db.Boolean, default=True) allow_local_image_posts = db.Column(db.Boolean, default=True) remote_image_cache_days = db.Column(db.Integer, default=30) enable_nsfw = db.Column(db.Boolean, default=False) enable_nsfl = db.Column(db.Boolean, default=False) community_creation_admin_only = db.Column(db.Boolean, default=False) reports_email_admins = db.Column(db.Boolean, default=True) registration_mode = db.Column(db.String(20), default='Closed') # possible values: Open, RequireApplication, Closed application_question = db.Column(db.Text, default='') allow_or_block_list = db.Column(db.Integer, default=2) # 1 = allow list, 2 = block list allowlist = db.Column(db.Text, default='') blocklist = db.Column(db.Text, default='') blocked_phrases = db.Column(db.Text, default='') # discard incoming content with these phrases auto_decline_referrers = db.Column(db.Text, default='rdrama.net\nahrefs.com') # automatically decline registration requests if the referrer is one of these created_at = db.Column(db.DateTime, default=utcnow) updated = db.Column(db.DateTime, default=utcnow) last_active = db.Column(db.DateTime, default=utcnow) log_activitypub_json = db.Column(db.Boolean, default=False) default_theme = db.Column(db.String(20), default='') contact_email = db.Column(db.String(255), default='') about = db.Column(db.Text, default='') logo = db.Column(db.String(40), default='') logo_152 = db.Column(db.String(40), default='') logo_32 = db.Column(db.String(40), default='') logo_16 = db.Column(db.String(40), default='') show_inoculation_block = db.Column(db.Boolean, default=True) @staticmethod def admins() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_ADMIN).order_by(User.id).all() @staticmethod def staff() -> List[User]: return User.query.filter_by(deleted=False, banned=False).join(user_role).filter(user_role.c.role_id == ROLE_STAFF).order_by(User.id).all() #class IngressQueue(db.Model): # id = db.Column(db.Integer, primary_key=True) # waiting_for = db.Column(db.String(255), index=True) # The AP ID of the object we're waiting to be created before this Activity can be ingested # activity_pub_log_id = db.Column(db.Integer, db.ForeignKey('activity_pub_log.id')) # The original Activity that failed because some target object does not exist # ap_date_published = db.Column(db.DateTime, default=utcnow) # The value of the datePublished field on the Activity # created_at = db.Column(db.DateTime, default=utcnow) # expires = db.Column(db.DateTime, default=utcnow) # When to give up waiting and delete this row # # @login.user_loader def load_user(id): return User.query.get(int(id))