from datetime import date, datetime, timedelta import requests from flask import redirect, url_for, flash, request, make_response, session, Markup, current_app, abort from flask_login import login_user, logout_user, current_user, login_required from flask_babel import _ from pillow_heif import register_heif_opener from sqlalchemy import or_, desc from app import db, constants from app.activitypub.signature import RsaKeys, HttpSignature from app.community.forms import SearchRemoteCommunity, AddLocalCommunity, CreatePost, NewReplyForm from app.community.util import search_for_community, community_url_exists, actor_to_community, post_replies, \ get_comment_branch, post_reply_count, ensure_directory_exists, opengraph_parse, url_to_thumbnail_file from app.constants import SUBSCRIPTION_MEMBER, SUBSCRIPTION_OWNER, POST_TYPE_LINK, POST_TYPE_ARTICLE, POST_TYPE_IMAGE from app.models import User, Community, CommunityMember, CommunityJoinRequest, CommunityBan, Post, PostReply, \ PostReplyVote, PostVote, File from app.community import bp from app.utils import get_setting, render_template, allowlist_html, markdown_to_html, validation_required, \ shorten_string, markdown_to_text, domain_from_url, validate_image, gibberish import os from PIL import Image, ImageOps @bp.route('/add_local', methods=['GET', 'POST']) @login_required def add_local(): form = AddLocalCommunity() if get_setting('allow_nsfw', False) is False: form.nsfw.render_kw = {'disabled': True} if form.validate_on_submit() and not community_url_exists(form.url.data): # todo: more intense data validation if form.url.data.strip().lower().startswith('/c/'): form.url.data = form.url.data[3:] private_key, public_key = RsaKeys.generate_keypair() community = Community(title=form.community_name.data, name=form.url.data, description=form.description.data, rules=form.rules.data, nsfw=form.nsfw.data, private_key=private_key, public_key=public_key, ap_profile_id=current_app.config['SERVER_NAME'] + '/c/' + form.url.data, subscriptions_count=1) db.session.add(community) db.session.commit() membership = CommunityMember(user_id=current_user.id, community_id=community.id, is_moderator=True, is_owner=True) db.session.add(membership) db.session.commit() flash(_('Your new community has been created.')) return redirect('/c/' + community.name) return render_template('community/add_local.html', title=_('Create community'), form=form) @bp.route('/add_remote', methods=['GET', 'POST']) @login_required def add_remote(): form = SearchRemoteCommunity() new_community = None if form.validate_on_submit(): address = form.address.data.strip() if address.startswith('!') and '@' in address: new_community = search_for_community(address) elif address.startswith('@') and '@' in address[1:]: # todo: the user is searching for a person instead ... elif '@' in address: new_community = search_for_community('!' + address) else: message = Markup( 'Type address in the format !community@server.name. Search on Lemmyverse.net to find some.') flash(message, 'error') return render_template('community/add_remote.html', title=_('Add remote community'), form=form, new_community=new_community, subscribed=current_user.subscribed(new_community) >= SUBSCRIPTION_MEMBER) # @bp.route('/c/', methods=['GET']) - defined in activitypub/routes.py, which calls this function for user requests. A bit weird. def show_community(community: Community): mods = community.moderators() is_moderator = current_user.is_authenticated and any(mod.user_id == current_user.id for mod in mods) is_owner = current_user.is_authenticated and any( mod.user_id == current_user.id and mod.is_owner == True for mod in mods) if community.private_mods: mod_list = [] else: mod_user_ids = [mod.user_id for mod in mods] mod_list = User.query.filter(User.id.in_(mod_user_ids)).all() if current_user.is_anonymous or current_user.ignore_bots: posts = community.posts.filter(Post.from_bot == False).order_by(desc(Post.last_active)).all() else: posts = community.posts.order_by(desc(Post.last_active)).all() description = shorten_string(community.description, 150) if community.description else None og_image = community.image.source_url if community.image_id else None return render_template('community/community.html', community=community, title=community.title, is_moderator=is_moderator, is_owner=is_owner, mods=mod_list, posts=posts, description=description, og_image=og_image, POST_TYPE_IMAGE=POST_TYPE_IMAGE, POST_TYPE_LINK=POST_TYPE_LINK) @bp.route('//subscribe', methods=['GET']) @login_required @validation_required def subscribe(actor): remote = False actor = actor.strip() if '@' in actor: community = Community.query.filter_by(banned=False, ap_id=actor).first() remote = True else: community = Community.query.filter_by(name=actor, banned=False, ap_id=None).first() if community is not None: if not current_user.subscribed(community): if remote: # send ActivityPub message to remote community, asking to follow. Accept message will be sent to our shared inbox join_request = CommunityJoinRequest(user_id=current_user.id, community_id=community.id) db.session.add(join_request) db.session.commit() follow = { "actor": f"https://{current_app.config['SERVER_NAME']}/u/{current_user.user_name}", "to": [community.ap_profile_id], "object": community.ap_profile_id, "type": "Follow", "id": f"https://{current_app.config['SERVER_NAME']}/activities/follow/{join_request.id}" } try: message = HttpSignature.signed_request(community.ap_inbox_url, follow, current_user.private_key, current_user.profile_id() + '#main-key') if message.status_code == 200: flash('Your request to subscribe has been sent to ' + community.title) else: flash('Response status code was not 200', 'warning') current_app.logger.error('Response code for subscription attempt was ' + str(message.status_code) + ' ' + message.text) except Exception as ex: flash('Failed to send request to subscribe: ' + str(ex), 'error') current_app.logger.error("Exception while trying to subscribe" + str(ex)) else: # for local communities, joining is instant banned = CommunityBan.query.filter_by(user_id=current_user.id, community_id=community.id).first() if banned: flash('You cannot join this community') member = CommunityMember(user_id=current_user.id, community_id=community.id) db.session.add(member) db.session.commit() flash('You are subscribed to ' + community.title) referrer = request.headers.get('Referer', None) if referrer is not None: return redirect(referrer) else: return redirect('/c/' + actor) else: abort(404) @bp.route('//unsubscribe', methods=['GET']) @login_required def unsubscribe(actor): community = actor_to_community(actor) if community is not None: subscription = current_user.subscribed(community) if subscription: if subscription != SUBSCRIPTION_OWNER: db.session.query(CommunityMember).filter_by(user_id=current_user.id, community_id=community.id).delete() db.session.commit() flash('You are unsubscribed from ' + community.title) else: # todo: community deletion flash('You need to make someone else the owner before unsubscribing.', 'warning') # send them back where they came from referrer = request.headers.get('Referer', None) if referrer is not None: return redirect(referrer) else: return redirect('/c/' + actor) else: abort(404) @bp.route('//submit', methods=['GET', 'POST']) @login_required @validation_required def add_post(actor): community = actor_to_community(actor) form = CreatePost() if get_setting('allow_nsfw', False) is False: form.nsfw.render_kw = {'disabled': True} if get_setting('allow_nsfl', False) is False: form.nsfl.render_kw = {'disabled': True} images_disabled = 'disabled' if not get_setting('allow_local_image_posts', True) else '' form.communities.choices = [(c.id, c.display_name()) for c in current_user.communities()] if form.validate_on_submit(): post = Post(user_id=current_user.id, community_id=form.communities.data, nsfw=form.nsfw.data, nsfl=form.nsfl.data) if form.type.data == '' or form.type.data == 'discussion': post.title = form.discussion_title.data post.body = form.discussion_body.data post.body_html = markdown_to_html(post.body) post.type = POST_TYPE_ARTICLE elif form.type.data == 'link': post.title = form.link_title.data post.url = form.link_url.data post.type = POST_TYPE_LINK domain = domain_from_url(form.link_url.data) domain.post_count += 1 post.domain = domain valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp'} unused, file_extension = os.path.splitext(form.link_url.data) # do not use _ here instead of 'unused' # this url is a link to an image - generate a thumbnail of it if file_extension in valid_extensions: file = url_to_thumbnail_file(form.link_url.data) if file: post.image = file db.session.add(file) else: # check opengraph tags on the page and make a thumbnail if an image is available in the og:image meta tag opengraph = opengraph_parse(form.link_url.data) if opengraph and opengraph.get('og:image', '') != '': filename = opengraph.get('og:image') valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp'} unused, file_extension = os.path.splitext(filename) if file_extension.lower() in valid_extensions: file = url_to_thumbnail_file(filename) if file: file.alt_text = opengraph.get('og:title') post.image = file db.session.add(file) elif form.type.data == 'image': allowed_extensions = ['.gif', '.jpg', '.jpeg', '.png', '.webp', '.heic'] post.title = form.image_title.data post.type = POST_TYPE_IMAGE uploaded_file = request.files['image_file'] if uploaded_file.filename != '': file_ext = os.path.splitext(uploaded_file.filename)[1] if file_ext.lower() not in allowed_extensions or file_ext != validate_image( uploaded_file.stream): abort(400) new_filename = gibberish(15) directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') uploaded_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() # resize if necessary img = Image.open(final_place) img_width = img.width img_height = img.height img = ImageOps.exif_transpose(img) if img.width > 2000 or img.height > 2000: img.thumbnail((2000, 2000)) img.save(final_place) img_width = img.width img_height = img.height img.thumbnail((256, 256)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=form.image_title.data, width=img_width, height=img_height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail) post.image = file db.session.add(file) elif form.type.data == 'poll': ... else: raise Exception('invalid post type') db.session.add(post) community.post_count += 1 db.session.commit() # todo: federate post creation out to followers flash('Post has been added') return redirect(f"/c/{community.link()}") else: form.communities.data = community.id form.notify.data = True return render_template('community/add_post.html', title=_('Add post to community'), form=form, community=community, images_disabled=images_disabled) @bp.route('/post/', methods=['GET', 'POST']) def show_post(post_id: int): post = Post.query.get_or_404(post_id) mods = post.community.moderators() is_moderator = current_user.is_authenticated and any(mod.user_id == current_user.id for mod in mods) form = NewReplyForm() if current_user.is_authenticated and current_user.verified and form.validate_on_submit(): reply = PostReply(user_id=current_user.id, post_id=post.id, community_id=post.community.id, body=form.body.data, body_html=markdown_to_html(form.body.data), body_html_safe=True, from_bot=current_user.bot, up_votes=1, nsfw=post.nsfw, nsfl=post.nsfl) db.session.add(reply) db.session.commit() reply_vote = PostReplyVote(user_id=current_user.id, author_id=current_user.id, post_reply_id=reply.id, effect=1.0) db.session.add(reply_vote) db.session.commit() form.body.data = '' flash('Your comment has been added.') # todo: flush cache # todo: federation return redirect(url_for('community.show_post', post_id=post_id)) # redirect to current page to avoid refresh resubmitting the form else: replies = post_replies(post.id, 'top') og_image = post.image.source_url if post.image_id else None description = shorten_string(markdown_to_text(post.body), 150) if post.body else None return render_template('community/post.html', title=post.title, post=post, is_moderator=is_moderator, canonical=post.ap_id, form=form, replies=replies, THREAD_CUTOFF_DEPTH=constants.THREAD_CUTOFF_DEPTH, description=description, og_image=og_image, POST_TYPE_IMAGE=constants.POST_TYPE_IMAGE, POST_TYPE_LINK=constants.POST_TYPE_LINK, POST_TYPE_ARTICLE=constants.POST_TYPE_ARTICLE) @bp.route('/post//', methods=['GET', 'POST']) @login_required @validation_required def post_vote(post_id: int, vote_direction): upvoted_class = downvoted_class = '' post = Post.query.get_or_404(post_id) existing_vote = PostVote.query.filter_by(user_id=current_user.id, post_id=post.id).first() if existing_vote: post.author.reputation -= existing_vote.effect if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) post.up_votes -= 1 post.score -= 1 else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 post.up_votes -= 1 post.down_votes += 1 post.score -= 2 downvoted_class = 'voted_down' else: # previous vote was down if vote_direction == 'upvote': # new vote is upvote existing_vote.effect = 1 post.up_votes += 1 post.down_votes -= 1 post.score += 1 upvoted_class = 'voted_up' else: # reverse a previous downvote db.session.delete(existing_vote) post.down_votes -= 1 post.score += 2 else: if vote_direction == 'upvote': effect = 1 post.up_votes += 1 post.score += 1 upvoted_class = 'voted_up' else: effect = -1 post.down_votes += 1 post.score -= 1 downvoted_class = 'voted_down' vote = PostVote(user_id=current_user.id, post_id=post.id, author_id=post.author.id, effect=effect) post.author.reputation += effect db.session.add(vote) db.session.commit() return render_template('community/_post_voting_buttons.html', post=post, upvoted_class=upvoted_class, downvoted_class=downvoted_class) @bp.route('/comment//', methods=['POST']) @login_required @validation_required def comment_vote(comment_id, vote_direction): upvoted_class = downvoted_class = '' comment = PostReply.query.get_or_404(comment_id) existing_vote = PostReplyVote.query.filter_by(user_id=current_user.id, post_reply_id=comment.id).first() if existing_vote: if existing_vote.effect > 0: # previous vote was up if vote_direction == 'upvote': # new vote is also up, so remove it db.session.delete(existing_vote) comment.up_votes -= 1 comment.score -= 1 else: # new vote is down while previous vote was up, so reverse their previous vote existing_vote.effect = -1 comment.up_votes -= 1 comment.down_votes += 1 comment.score -= 2 downvoted_class = 'voted_down' else: # previous vote was down if vote_direction == 'upvote': # new vote is upvote existing_vote.effect = 1 comment.up_votes += 1 comment.down_votes -= 1 comment.score += 1 upvoted_class = 'voted_up' else: # reverse a previous downvote db.session.delete(existing_vote) comment.down_votes -= 1 comment.score += 2 else: if vote_direction == 'upvote': effect = 1 comment.up_votes += 1 comment.score += 1 upvoted_class = 'voted_up' else: effect = -1 comment.down_votes += 1 comment.score -= 1 downvoted_class = 'voted_down' vote = PostReplyVote(user_id=current_user.id, post_reply_id=comment_id, author_id=comment.author.id, effect=effect) comment.author.reputation += effect db.session.add(vote) db.session.commit() return render_template('community/_voting_buttons.html', comment=comment, upvoted_class=upvoted_class, downvoted_class=downvoted_class) @bp.route('/post//comment/') def continue_discussion(post_id, comment_id): post = Post.query.get_or_404(post_id) comment = PostReply.query.get_or_404(comment_id) mods = post.community.moderators() is_moderator = current_user.is_authenticated and any(mod.user_id == current_user.id for mod in mods) replies = get_comment_branch(post.id, comment.id, 'top') return render_template('community/continue_discussion.html', title=_('Discussing %(title)s', title=post.title), post=post, is_moderator=is_moderator, comment=comment, replies=replies) @bp.route('/post//comment//reply', methods=['GET', 'POST']) @login_required def add_reply(post_id: int, comment_id: int): post = Post.query.get_or_404(post_id) comment = PostReply.query.get_or_404(comment_id) mods = post.community.moderators() is_moderator = current_user.is_authenticated and any(mod.user_id == current_user.id for mod in mods) form = NewReplyForm() if form.validate_on_submit(): reply = PostReply(user_id=current_user.id, post_id=post.id, parent_id=comment.id, depth=comment.depth + 1, community_id=post.community.id, body=form.body.data, body_html=markdown_to_html(form.body.data), body_html_safe=True, from_bot=current_user.bot, up_votes=1, nsfw=post.nsfw, nsfl=post.nsfl) db.session.add(reply) db.session.commit() reply_vote = PostReplyVote(user_id=current_user.id, author_id=current_user.id, post_reply_id=reply.id, effect=1.0) db.session.add(reply_vote) post.reply_count = post_reply_count(post.id) db.session.commit() form.body.data = '' flash('Your comment has been added.') # todo: flush cache # todo: federation if reply.depth <= constants.THREAD_CUTOFF_DEPTH: return redirect(url_for('community.show_post', post_id=post_id, _anchor=f'comment_{reply.parent_id}')) else: return redirect(url_for('community.continue_discussion', post_id=post_id, comment_id=reply.parent_id)) else: return render_template('community/add_reply.html', title=_('Discussing %(title)s', title=post.title), post=post, is_moderator=is_moderator, form=form, comment=comment)