From 95be488cc9f0e3a94fc632de79ba55711d72ce90 Mon Sep 17 00:00:00 2001 From: freamon Date: Sat, 2 Nov 2024 23:56:56 +0000 Subject: [PATCH] API: use Celery for federation tasks --- app/shared/community.py | 179 ++++----------- app/shared/post.py | 59 +---- app/shared/reply.py | 417 +++-------------------------------- app/shared/tasks/__init__.py | 30 +++ app/shared/tasks/deletes.py | 105 +++++++++ app/shared/tasks/flags.py | 58 +++++ app/shared/tasks/follows.py | 165 ++++++++++++++ app/shared/tasks/likes.py | 108 +++++++++ app/shared/tasks/notes.py | 205 +++++++++++++++++ app/user/utils.py | 9 +- 10 files changed, 747 insertions(+), 588 deletions(-) create mode 100644 app/shared/tasks/__init__.py create mode 100644 app/shared/tasks/deletes.py create mode 100644 app/shared/tasks/flags.py create mode 100644 app/shared/tasks/follows.py create mode 100644 app/shared/tasks/likes.py create mode 100644 app/shared/tasks/notes.py diff --git a/app/shared/community.py b/app/shared/community.py index 94aff35e..5b48aab0 100644 --- a/app/shared/community.py +++ b/app/shared/community.py @@ -1,171 +1,66 @@ from app import db, cache -from app.activitypub.signature import post_request from app.constants import * -from app.models import Community, CommunityBan, CommunityBlock, CommunityJoinRequest, CommunityMember -from app.utils import authorise_api_user, blocked_communities, community_membership, joined_communities, gibberish +from app.models import CommunityBlock, CommunityMember +from app.shared.tasks import task_selector +from app.utils import authorise_api_user, blocked_communities -from flask import abort, current_app, flash +from flask import current_app, flash from flask_babel import _ from flask_login import current_user + # would be in app/constants.py SRC_WEB = 1 SRC_PUB = 2 SRC_API = 3 +SRC_PLD = 4 # admin preload form to seed communities + # function can be shared between WEB and API (only API calls it for now) # call from admin.federation not tested -def join_community(community_id: int, src, auth=None, user_id=None, main_user_name=True): +def join_community(community_id: int, src, auth=None, user_id=None): if src == SRC_API: - community = Community.query.filter_by(id=community_id).one() - user = authorise_api_user(auth, return_type='model') - else: - community = Community.query.get_or_404(community_id) - if not user_id: - user = current_user - else: - user = User.query.get(user_id) + user_id = authorise_api_user(auth) - pre_load_message = {} - if community_membership(user, community) != SUBSCRIPTION_MEMBER and community_membership(user, community) != SUBSCRIPTION_PENDING: - banned = CommunityBan.query.filter_by(user_id=user.id, community_id=community.id).first() - if banned: - if src == SRC_API: - raise Exception('banned_from_community') - else: - if main_user_name: - flash(_('You cannot join this community')) - return - else: - pre_load_message['user_banned'] = True - return pre_load_message - else: - if src == SRC_API: - return user.id - else: - if not main_user_name: - pre_load_message['status'] = 'already subscribed, or subsciption pending' - return pre_load_message + send_async = not (current_app.debug or src == SRC_WEB) # False if using a browser - success = True - remote = not community.is_local() - if remote: - # send ActivityPub message to remote community, asking to follow. Accept message will be sent to our shared inbox - join_request = CommunityJoinRequest(user_id=user.id, community_id=community.id) - db.session.add(join_request) + sync_retval = task_selector('join_community', send_async, user_id=user_id, community_id=community_id, src=src) + + if send_async or sync_retval is True: + member = CommunityMember(user_id=user_id, community_id=community_id) + db.session.add(member) db.session.commit() - if community.instance.online(): - follow = { - "actor": user.public_url(main_user_name=main_user_name), - "to": [community.public_url()], - "object": community.public_url(), - "type": "Follow", - "id": f"https://{current_app.config['SERVER_NAME']}/activities/follow/{join_request.id}" - } - success = post_request(community.ap_inbox_url, follow, user.private_key, - user.public_url(main_user_name=main_user_name) + '#main-key', timeout=10) - if success is False or isinstance(success, str): - if 'is not in allowlist' in success: - if src == SRC_API: - raise Exception('not_in_remote_instance_allowlist') - else: - msg_to_user = f'{community.instance.domain} does not allow us to join their communities.' - if main_user_name: - flash(_(msg_to_user), 'error') - return - else: - pre_load_message['status'] = msg_to_user - return pre_load_message - else: - if src != SRC_API: - msg_to_user = "There was a problem while trying to communicate with remote server. If other people have already joined this community it won't matter." - if main_user_name: - flash(_(msg_to_user), 'error') - return - else: - pre_load_message['status'] = msg_to_user - return pre_load_message - # for local communities, joining is instant - member = CommunityMember(user_id=user.id, community_id=community.id) - db.session.add(member) - db.session.commit() - if success is True: - cache.delete_memoized(community_membership, user, community) - cache.delete_memoized(joined_communities, user.id) - if src == SRC_API: - return user.id - else: - if main_user_name: - flash('You joined ' + community.title) - else: - pre_load_message['status'] = 'joined' - - if not main_user_name: - return pre_load_message - - # for SRC_WEB, calling function should handle if the community isn't found + if src == SRC_API: + return user_id + elif src == SRC_PLD: + return sync_retval + else: + return # function can be shared between WEB and API (only API calls it for now) def leave_community(community_id: int, src, auth=None): - if src == SRC_API: - community = Community.query.filter_by(id=community_id).one() - user = authorise_api_user(auth, return_type='model') + user_id = authorise_api_user(auth) if src == SRC_API else current_user.id + cm = CommunityMember.query.filter_by(user_id=user_id, community_id=community_id).one() + if not cm.is_owner: + task_selector('leave_community', user_id=user_id, community_id=community_id) + + db.session.query(CommunityMember).filter_by(user_id=user_id, community_id=community_id).delete() + db.session.commit() + + if src == SRC_WEB: + flash('You have left the community') else: - community = Community.query.get_or_404(community_id) - user = current_user - - subscription = community_membership(user, community) - if subscription: - if subscription != SUBSCRIPTION_OWNER: - proceed = True - # Undo the Follow - if not community.is_local(): - success = True - if not community.instance.gone_forever: - undo_id = f"https://{current_app.config['SERVER_NAME']}/activities/undo/" + gibberish(15) - follow = { - "actor": user.public_url(), - "to": [community.public_url()], - "object": community.public_url(), - "type": "Follow", - "id": f"https://{current_app.config['SERVER_NAME']}/activities/follow/{gibberish(15)}" - } - undo = { - 'actor': user.public_url(), - 'to': [community.public_url()], - 'type': 'Undo', - 'id': undo_id, - 'object': follow - } - success = post_request(community.ap_inbox_url, undo, user.private_key, - user.public_url() + '#main-key', timeout=10) - if success is False or isinstance(success, str): - if src != SRC_API: - flash('There was a problem while trying to unsubscribe', 'error') - return - - if proceed: - db.session.query(CommunityMember).filter_by(user_id=user.id, community_id=community.id).delete() - db.session.query(CommunityJoinRequest).filter_by(user_id=user.id, community_id=community.id).delete() - db.session.commit() - - if src != SRC_API: - flash('You have left ' + community.title) - - cache.delete_memoized(community_membership, user, community) - cache.delete_memoized(joined_communities, user.id) + # todo: community deletion + if src == SRC_API: + raise Exception('need_to_make_someone_else_owner') else: - # todo: community deletion - if src == SRC_API: - raise Exception('need_to_make_someone_else_owner') - else: - flash('You need to make someone else the owner before unsubscribing.', 'warning') - return + flash('You need to make someone else the owner before unsubscribing.', 'warning') + return if src == SRC_API: - return user.id + return user_id else: # let calling function handle redirect return diff --git a/app/shared/post.py b/app/shared/post.py index c577f4a2..3df03a70 100644 --- a/app/shared/post.py +++ b/app/shared/post.py @@ -1,11 +1,10 @@ -from app import cache, db -from app.activitypub.signature import default_context, post_request_in_background -from app.community.util import send_to_remote_instance +from app import db from app.constants import * -from app.models import NotificationSubscription, Post, PostBookmark, User -from app.utils import gibberish, instance_banned, render_template, authorise_api_user, recently_upvoted_posts, recently_downvoted_posts, shorten_string +from app.models import NotificationSubscription, Post, PostBookmark +from app.shared.tasks import task_selector +from app.utils import render_template, authorise_api_user, shorten_string -from flask import abort, current_app, flash, redirect, request, url_for +from flask import abort, flash, redirect, request, url_for from flask_babel import _ from flask_login import current_user @@ -28,51 +27,7 @@ def vote_for_post(post_id: int, vote_direction, src, auth=None): undo = post.vote(user, vote_direction) - if not post.community.local_only: - if undo: - action_json = { - 'actor': user.public_url(not(post.community.instance.votes_are_public() and user.vote_privately())), - 'type': 'Undo', - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}", - 'audience': post.community.public_url(), - 'object': { - 'actor': user.public_url(not(post.community.instance.votes_are_public() and user.vote_privately())), - 'object': post.public_url(), - 'type': undo, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/{undo.lower()}/{gibberish(15)}", - 'audience': post.community.public_url() - } - } - else: - action_type = 'Like' if vote_direction == 'upvote' else 'Dislike' - action_json = { - 'actor': user.public_url(not(post.community.instance.votes_are_public() and user.vote_privately())), - 'object': post.profile_id(), - 'type': action_type, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/{action_type.lower()}/{gibberish(15)}", - 'audience': post.community.public_url() - } - if post.community.is_local(): - announce = { - "id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - "type": 'Announce', - "to": [ - "https://www.w3.org/ns/activitystreams#Public" - ], - "actor": post.community.public_url(), - "cc": [ - post.community.ap_followers_url - ], - '@context': default_context(), - 'object': action_json - } - for instance in post.community.following_instances(): - if instance.inbox and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): - send_to_remote_instance(instance.id, post.community.id, announce) - else: - post_request_in_background(post.community.ap_inbox_url, action_json, user.private_key, - user.public_url(not(post.community.instance.votes_are_public() and user.vote_privately())) + '#main-key') - + task_selector('vote_for_post', user_id=user.id, post_id=post_id, vote_to_undo=undo, vote_direction=vote_direction) if src == SRC_API: return user.id @@ -83,8 +38,6 @@ def vote_for_post(post_id: int, vote_direction, src, auth=None): recently_upvoted = [post_id] elif vote_direction == 'downvote' and undo is None: recently_downvoted = [post_id] - cache.delete_memoized(recently_upvoted_posts, user.id) - cache.delete_memoized(recently_downvoted_posts, user.id) template = 'post/_post_voting_buttons.html' if request.args.get('style', '') == '' else 'post/_post_voting_buttons_masonry.html' return render_template(template, post=post, community=post.community, recently_upvoted=recently_upvoted, diff --git a/app/shared/reply.py b/app/shared/reply.py index 357a5875..d75153ec 100644 --- a/app/shared/reply.py +++ b/app/shared/reply.py @@ -3,6 +3,7 @@ from app.activitypub.signature import default_context, post_request_in_backgroun from app.community.util import send_to_remote_instance from app.constants import * from app.models import Instance, Notification, NotificationSubscription, Post, PostReply, PostReplyBookmark, Report, Site, User, utcnow +from app.shared.tasks import task_selector from app.utils import gibberish, instance_banned, render_template, authorise_api_user, recently_upvoted_post_replies, recently_downvoted_post_replies, shorten_string, \ piefed_markdown_to_lemmy_markdown, markdown_to_html, ap_datetime @@ -18,7 +19,6 @@ SRC_API = 3 # function can be shared between WEB and API (only API calls it for now) # comment_vote in app/post/routes would just need to do 'return vote_for_reply(reply_id, vote_direction, SRC_WEB)' - def vote_for_reply(reply_id: int, vote_direction, src, auth=None): if src == SRC_API: reply = PostReply.query.filter_by(id=reply_id).one() @@ -29,50 +29,7 @@ def vote_for_reply(reply_id: int, vote_direction, src, auth=None): undo = reply.vote(user, vote_direction) - if not reply.community.local_only: - if undo: - action_json = { - 'actor': user.public_url(not(reply.community.instance.votes_are_public() and user.vote_privately())), - 'type': 'Undo', - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}", - 'audience': reply.community.public_url(), - 'object': { - 'actor': user.public_url(not(reply.community.instance.votes_are_public() and user.vote_privately())), - 'object': reply.public_url(), - 'type': undo, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/{undo.lower()}/{gibberish(15)}", - 'audience': reply.community.public_url() - } - } - else: - action_type = 'Like' if vote_direction == 'upvote' else 'Dislike' - action_json = { - 'actor': user.public_url(not(reply.community.instance.votes_are_public() and user.vote_privately())), - 'object': reply.public_url(), - 'type': action_type, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/{action_type.lower()}/{gibberish(15)}", - 'audience': reply.community.public_url() - } - if reply.community.is_local(): - announce = { - "id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - "type": 'Announce', - "to": [ - "https://www.w3.org/ns/activitystreams#Public" - ], - "actor": reply.community.ap_profile_id, - "cc": [ - reply.community.ap_followers_url - ], - '@context': default_context(), - 'object': action_json - } - for instance in reply.community.following_instances(): - if instance.inbox and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): - send_to_remote_instance(instance.id, reply.community.id, announce) - else: - post_request_in_background(reply.community.ap_inbox_url, action_json, user.private_key, - user.public_url(not(reply.community.instance.votes_are_public() and user.vote_privately())) + '#main-key') + task_selector('vote_for_reply', user_id=user.id, reply_id=reply_id, vote_to_undo=undo, vote_direction=vote_direction) if src == SRC_API: return user.id @@ -83,8 +40,6 @@ def vote_for_reply(reply_id: int, vote_direction, src, auth=None): recently_upvoted = [reply_id] elif vote_direction == 'downvote' and undo is None: recently_downvoted = [reply_id] - cache.delete_memoized(recently_upvoted_post_replies, user.id) - cache.delete_memoized(recently_downvoted_post_replies, user.id) return render_template('post/_reply_voting_buttons.html', comment=reply, recently_upvoted_replies=recently_upvoted, @@ -206,8 +161,8 @@ def basic_rate_limit_check(user): def make_reply(input, post, parent_id, src, auth=None): if src == SRC_API: user = authorise_api_user(auth, return_type='model') - if not basic_rate_limit_check(user): - raise Exception('rate_limited') + #if not basic_rate_limit_check(user): + # raise Exception('rate_limited') content = input['body'] notify_author = input['notify_author'] language_id = input['language_id'] @@ -235,104 +190,7 @@ def make_reply(input, post, parent_id, src, auth=None): input.body.data = '' flash('Your comment has been added.') - # federation - if parent_id: - in_reply_to = parent_reply - else: - in_reply_to = post - - if not post.community.local_only: - reply_json = { - 'type': 'Note', - 'id': reply.public_url(), - 'attributedTo': user.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'cc': [ - post.community.public_url(), - in_reply_to.author.public_url() - ], - 'content': reply.body_html, - 'inReplyTo': in_reply_to.profile_id(), - 'url': reply.profile_id(), - 'mediaType': 'text/html', - 'source': {'content': reply.body, 'mediaType': 'text/markdown'}, - 'published': ap_datetime(utcnow()), - 'distinguished': False, - 'audience': post.community.public_url(), - 'contentMap': { - 'en': reply.body_html - }, - 'language': { - 'identifier': reply.language_code(), - 'name': reply.language_name() - } - } - create_json = { - '@context': default_context(), - 'type': 'Create', - 'actor': user.public_url(), - 'audience': post.community.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'cc': [ - post.community.public_url(), - in_reply_to.author.public_url() - ], - 'object': reply_json, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/create/{gibberish(15)}" - } - if in_reply_to.notify_author and in_reply_to.author.ap_id is not None: - reply_json['tag'] = [ - { - 'href': in_reply_to.author.public_url(), - 'name': in_reply_to.author.mention_tag(), - 'type': 'Mention' - } - ] - create_json['tag'] = [ - { - 'href': in_reply_to.author.public_url(), - 'name': in_reply_to.author.mention_tag(), - 'type': 'Mention' - } - ] - if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it - success = post_request(post.community.ap_inbox_url, create_json, user.private_key, - user.public_url() + '#main-key') - if src == SRC_WEB: - if success is False or isinstance(success, str): - flash('Failed to send reply', 'error') - else: # local community - send it to followers on remote instances - del create_json['@context'] - announce = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - 'type': 'Announce', - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'actor': post.community.public_url(), - 'cc': [ - post.community.ap_followers_url - ], - '@context': default_context(), - 'object': create_json - } - for instance in post.community.following_instances(): - if instance.inbox and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): - send_to_remote_instance(instance.id, post.community.id, announce) - - # send copy of Note to comment author (who won't otherwise get it if no-one else on their instance is subscribed to the community) - if not in_reply_to.author.is_local() and in_reply_to.author.ap_domain != reply.community.ap_domain: - if not post.community.is_local() or (post.community.is_local and not post.community.has_followers_from_domain(in_reply_to.author.ap_domain)): - success = post_request(in_reply_to.author.ap_inbox_url, create_json, user.private_key, user.public_url() + '#main-key') - if success is False or isinstance(success, str): - # sending to shared inbox is good enough for Mastodon, but Lemmy will reject it the local community has no followers - personal_inbox = in_reply_to.author.public_url() + '/inbox' - post_request(personal_inbox, create_json, user.private_key, user.public_url() + '#main-key') - + task_selector('make_reply', user_id=user.id, reply_id=reply.id, parent_id=parent_id) if src == SRC_API: return user.id, reply @@ -364,105 +222,7 @@ def edit_reply(input, reply, post, src, auth=None): if src == SRC_WEB: flash(_('Your changes have been saved.'), 'success') - if reply.parent_id: - in_reply_to = PostReply.query.filter_by(id=reply.parent_id).one() - else: - in_reply_to = post - - # federate edit - if not post.community.local_only: - reply_json = { - 'type': 'Note', - 'id': reply.public_url(), - 'attributedTo': user.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'cc': [ - post.community.public_url(), - in_reply_to.author.public_url() - ], - 'content': reply.body_html, - 'inReplyTo': in_reply_to.profile_id(), - 'url': reply.public_url(), - 'mediaType': 'text/html', - 'source': {'content': reply.body, 'mediaType': 'text/markdown'}, - 'published': ap_datetime(reply.posted_at), - 'updated': ap_datetime(reply.edited_at), - 'distinguished': False, - 'audience': post.community.public_url(), - 'contentMap': { - 'en': reply.body_html - }, - 'language': { - 'identifier': reply.language_code(), - 'name': reply.language_name() - } - } - update_json = { - '@context': default_context(), - 'type': 'Update', - 'actor': user.public_url(), - 'audience': post.community.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'cc': [ - post.community.public_url(), - in_reply_to.author.public_url() - ], - 'object': reply_json, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/update/{gibberish(15)}" - } - if in_reply_to.notify_author and in_reply_to.author.ap_id is not None: - reply_json['tag'] = [ - { - 'href': in_reply_to.author.public_url(), - 'name': in_reply_to.author.mention_tag(), - 'type': 'Mention' - } - ] - update_json['tag'] = [ - { - 'href': in_reply_to.author.public_url(), - 'name': in_reply_to.author.mention_tag(), - 'type': 'Mention' - } - ] - if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it - success = post_request(post.community.ap_inbox_url, update_json, user.private_key, - user.public_url() + '#main-key') - if src == SRC_WEB: - if success is False or isinstance(success, str): - flash('Failed to send send edit to remote server', 'error') - else: # local community - send it to followers on remote instances - del update_json['@context'] - announce = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - 'type': 'Announce', - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'actor': post.community.public_url(), - 'cc': [ - post.community.ap_followers_url - ], - '@context': default_context(), - 'object': update_json - } - - for instance in post.community.following_instances(): - if instance.inbox and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): - send_to_remote_instance(instance.id, post.community.id, announce) - - # send copy of Note to post author (who won't otherwise get it if no-one else on their instance is subscribed to the community) - if not in_reply_to.author.is_local() and in_reply_to.author.ap_domain != reply.community.ap_domain: - if not post.community.is_local() or (post.community.is_local and not post.community.has_followers_from_domain(in_reply_to.author.ap_domain)): - success = post_request(in_reply_to.author.ap_inbox_url, update_json, user.private_key, user.public_url() + '#main-key') - if success is False or isinstance(success, str): - # sending to shared inbox is good enough for Mastodon, but Lemmy will reject it the local community has no followers - personal_inbox = in_reply_to.author.public_url() + '/inbox' - post_request(personal_inbox, update_json, user.private_key, user.public_url() + '#main-key') + task_selector('edit_reply', user_id=user.id, reply_id=reply.id, parent_id=reply.parent_id) if src == SRC_API: return user.id, reply @@ -471,77 +231,28 @@ def edit_reply(input, reply, post, src, auth=None): # just for deletes by owner (mod deletes are classed as 'remove') -# just for API for now, as WEB version needs attention to ensure that replies can be 'undeleted' def delete_reply(reply_id, src, auth): if src == SRC_API: reply = PostReply.query.filter_by(id=reply_id, deleted=False).one() - post = Post.query.filter_by(id=reply.post_id).one() - user = authorise_api_user(auth, return_type='model', id_match=reply.user_id) + user_id = authorise_api_user(auth, id_match=reply.user_id) else: reply = PostReply.query.get_or_404(reply_id) - post = Post.query.get_or_404(reply.post_id) - user = current_user + user_id = current_user.id reply.deleted = True - reply.deleted_by = user.id - # everything else (votes, body, reports, bookmarks, subscriptions, etc) only wants deleting when it's properly purged after 7 days - # reply_view will return '' in body if reply.deleted == True + reply.deleted_by = user_id if not reply.author.bot: - post.reply_count -= 1 + reply.post.reply_count -= 1 reply.author.post_reply_count -= 1 db.session.commit() if src == SRC_WEB: flash(_('Comment deleted.')) - # federate delete - if not post.community.local_only: - delete_json = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}", - 'type': 'Delete', - 'actor': user.public_url(), - 'audience': post.community.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'published': ap_datetime(utcnow()), - 'cc': [ - post.community.public_url(), - user.followers_url() - ], - 'object': reply.ap_id, - '@context': default_context() - } - - if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it - success = post_request(post.community.ap_inbox_url, delete_json, user.private_key, - user.public_url() + '#main-key') - if src == SRC_WEB: - if success is False or isinstance(success, str): - flash('Failed to send delete to remote server', 'error') - - else: # local community - send it to followers on remote instances - del delete_json['@context'] - announce = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - 'type': 'Announce', - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'actor': post.community.public_url(), - 'cc': [ - post.community.public_url() + '/followers' - ], - '@context': default_context(), - 'object': delete_json - } - - for instance in post.community.following_instances(): - if instance.inbox: - send_to_remote_instance(instance.id, post.community.id, announce) + task_selector('delete_reply', user_id=user_id, reply_id=reply.id) if src == SRC_API: - return user.id, reply + return user_id, reply else: return @@ -549,87 +260,27 @@ def delete_reply(reply_id, src, auth): def restore_reply(reply_id, src, auth): if src == SRC_API: reply = PostReply.query.filter_by(id=reply_id, deleted=True).one() - post = Post.query.filter_by(id=reply.post_id).one() - user = authorise_api_user(auth, return_type='model', id_match=reply.user_id) - if reply.deleted_by and reply.user_id != reply.deleted_by: + user_id = authorise_api_user(auth, id_match=reply.user_id) + if reply.user_id != reply.deleted_by: raise Exception('incorrect_login') else: reply = PostReply.query.get_or_404(reply_id) - post = Post.query.get_or_404(reply.post_id) - user = current_user + user_id = current_user.id reply.deleted = False reply.deleted_by = None if not reply.author.bot: - post.reply_count += 1 + reply.post.reply_count += 1 reply.author.post_reply_count += 1 db.session.commit() if src == SRC_WEB: flash(_('Comment restored.')) - # federate undelete - if not post.community.local_only: - delete_json = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}", - 'type': 'Delete', - 'actor': user.public_url(), - 'audience': post.community.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'published': ap_datetime(utcnow()), - 'cc': [ - post.community.public_url(), - user.followers_url() - ], - 'object': reply.ap_id - } - undo_json = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}", - 'type': 'Undo', - 'actor': user.public_url(), - 'audience': post.community.public_url(), - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'cc': [ - post.community.public_url(), - user.followers_url() - ], - 'object': delete_json, - '@context': default_context() - } - - if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it - success = post_request(post.community.ap_inbox_url, undo_json, user.private_key, - user.public_url() + '#main-key') - if src == SRC_WEB: - if success is False or isinstance(success, str): - flash('Failed to send delete to remote server', 'error') - - else: # local community - send it to followers on remote instances - del undo_json['@context'] - announce = { - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", - 'type': 'Announce', - 'to': [ - 'https://www.w3.org/ns/activitystreams#Public' - ], - 'actor': post.community.public_url(), - 'cc': [ - post.community.public_url() + '/followers' - ], - '@context': default_context(), - 'object': undo_json - } - - for instance in post.community.following_instances(): - if instance.inbox: - send_to_remote_instance(instance.id, post.community.id, announce) + task_selector('restore_reply', user_id=user_id, reply_id=reply.id) if src == SRC_API: - return user.id, reply + return user_id, reply else: return @@ -637,13 +288,13 @@ def restore_reply(reply_id, src, auth): def report_reply(reply_id, input, src, auth=None): if src == SRC_API: reply = PostReply.query.filter_by(id=reply_id).one() - user = authorise_api_user(auth, return_type='model') + user_id = authorise_api_user(auth) reason = input['reason'] description = input['description'] report_remote = input['report_remote'] else: reply = PostReply.query.get_or_404(reply_id) - user = current_user + user_id = current_user.id reason = input.reasons_to_string(input.reasons.data) description = input.description.data report_remote = input.report_remote.data @@ -655,7 +306,7 @@ def report_reply(reply_id, input, src, auth=None): flash(_('Comment has already been reported, thank you!')) return - report = Report(reasons=reason, description=description, type=2, reporter_id=user.id, suspect_post_id=reply.post.id, suspect_community_id=reply.community.id, + report = Report(reasons=reason, description=description, type=2, reporter_id=user_id, suspect_post_id=reply.post.id, suspect_community_id=reply.community.id, suspect_user_id=reply.author.id, suspect_post_reply_id=reply.id, in_community_id=reply.community.id, source_instance_id=1) db.session.add(report) @@ -666,14 +317,14 @@ def report_reply(reply_id, input, src, auth=None): if moderator and moderator.is_local(): notification = Notification(user_id=mod.user_id, title=_('A comment has been reported'), url=f"https://{current_app.config['SERVER_NAME']}/comment/{reply.id}", - author_id=user.id) + author_id=user_id) db.session.add(notification) already_notified.add(mod.user_id) reply.reports += 1 # todo: only notify admins for certain types of report for admin in Site.admins(): if admin.id not in already_notified: - notify = Notification(title='Suspicious content', url='/admin/reports', user_id=admin.id, author_id=user.id) + notify = Notification(title='Suspicious content', url='/admin/reports', user_id=admin.id, author_id=user_id) db.session.add(notify) admin.unread_notifications += 1 db.session.commit() @@ -683,26 +334,10 @@ def report_reply(reply_id, input, src, auth=None): summary = reason if description: summary += ' - ' + description - report_json = { - 'actor': user.public_url(), - 'audience': reply.community.public_url(), - 'content': None, - 'id': f"https://{current_app.config['SERVER_NAME']}/activities/flag/{gibberish(15)}", - 'object': reply.ap_id, - 'summary': summary, - 'to': [ - reply.community.public_url() - ], - 'type': 'Flag' - } - instance = Instance.query.get(reply.community.instance_id) - if reply.community.ap_inbox_url and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): - success = post_request(reply.community.ap_inbox_url, report_json, user.private_key, user.public_url() + '#main-key') - if success is False or isinstance(success, str): - if src == SRC_WEB: - flash('Failed to send report to remote server', 'error') + + task_selector('report_reply', user_id=user_id, reply_id=reply_id, summary=summary) if src == SRC_API: - return user.id, report + return user_id, report else: return diff --git a/app/shared/tasks/__init__.py b/app/shared/tasks/__init__.py new file mode 100644 index 00000000..3c0a980a --- /dev/null +++ b/app/shared/tasks/__init__.py @@ -0,0 +1,30 @@ +from app.shared.tasks.follows import join_community, leave_community +from app.shared.tasks.likes import vote_for_post, vote_for_reply +from app.shared.tasks.notes import make_reply, edit_reply +from app.shared.tasks.deletes import delete_reply, restore_reply +from app.shared.tasks.flags import report_reply + +from flask import current_app + + +def task_selector(task_key, send_async=True, **kwargs): + tasks = { + 'join_community': join_community, + 'leave_community': leave_community, + 'vote_for_post': vote_for_post, + 'vote_for_reply': vote_for_reply, + 'make_reply': make_reply, + 'edit_reply': edit_reply, + 'delete_reply': delete_reply, + 'restore_reply': restore_reply, + 'report_reply': report_reply + } + + if current_app.debug: + send_async = False + + if send_async: + tasks[task_key].delay(send_async=send_async, **kwargs) + else: + return tasks[task_key](send_async=send_async, **kwargs) + diff --git a/app/shared/tasks/deletes.py b/app/shared/tasks/deletes.py new file mode 100644 index 00000000..6ff31c79 --- /dev/null +++ b/app/shared/tasks/deletes.py @@ -0,0 +1,105 @@ +from app import celery +from app.activitypub.signature import default_context, post_request +from app.models import CommunityBan, PostReply, User +from app.utils import gibberish, instance_banned + +from flask import current_app + + +""" JSON format +Delete: +{ + 'id': + 'type': + 'actor': + 'object': + '@context': + 'audience': + 'to': [] + 'cc': [] +} +For Announce, remove @context from inner object, and use same fields except audience +""" + + +@celery.task +def delete_reply(send_async, user_id, reply_id): + reply = PostReply.query.filter_by(id=reply_id).one() + delete_object(user_id, reply) + + +@celery.task +def restore_reply(send_async, user_id, reply_id): + reply = PostReply.query.filter_by(id=reply_id).one() + delete_object(user_id, reply, is_restore=True) + + +def delete_object(user_id, object, is_restore=False): + user = User.query.filter_by(id=user_id).one() + community = object.community + if community.local_only or not community.instance.online(): + return + + banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community.id).first() + if banned: + return + if not community.is_local(): + if user.has_blocked_instance(community.instance.id) or instance_banned(community.instance.domain): + return + + delete_id = f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}" + to = ["https://www.w3.org/ns/activitystreams#Public"] + cc = [community.public_url()] + delete = { + 'id': delete_id, + 'type': 'Delete', + 'actor': user.public_url(), + 'object': object.public_url(), + '@context': default_context(), + 'audience': community.public_url(), + 'to': to, + 'cc': cc + } + + if is_restore: + del delete['@context'] + undo_id = f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}" + undo = { + 'id': undo_id, + 'type': 'Undo', + 'actor': user.public_url(), + 'object': delete, + '@context': default_context(), + 'audience': community.public_url(), + 'to': to, + 'cc': cc + } + + if community.is_local(): + if is_restore: + del undo['@context'] + object=undo + else: + del delete['@context'] + object=delete + + announce_id = f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}" + actor = community.public_url() + cc = [community.ap_followers_url] + announce = { + 'id': announce_id, + 'type': 'Announce', + 'actor': actor, + 'object': object, + '@context': default_context(), + 'to': to, + 'cc': cc + } + for instance in community.following_instances(): + if instance.inbox and instance.online() and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): + post_request(instance.inbox, announce, community.private_key, community.public_url() + '#main-key') + else: + payload = undo if is_restore else delete + post_request(community.ap_inbox_url, payload, user.private_key, user.public_url() + '#main-key') + + diff --git a/app/shared/tasks/flags.py b/app/shared/tasks/flags.py new file mode 100644 index 00000000..f461794f --- /dev/null +++ b/app/shared/tasks/flags.py @@ -0,0 +1,58 @@ +from app import celery +from app.activitypub.signature import default_context, post_request +from app.models import CommunityBan, PostReply, User +from app.utils import gibberish, instance_banned + +from flask import current_app + + +""" JSON format +Flag: +{ + 'id': + 'type': + 'actor': + 'object': + '@context': + 'audience': + 'to': [] + 'summary': +} +""" + + +@celery.task +def report_reply(send_async, user_id, reply_id, summary): + reply = PostReply.query.filter_by(id=reply_id).one() + report_object(user_id, reply, summary) + + +def report_object(user_id, object, summary): + user = User.query.filter_by(id=user_id).one() + community = object.community + if community.local_only or not community.instance.online(): + return + + banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community.id).first() + if banned: + return + if not community.is_local(): + if user.has_blocked_instance(community.instance.id) or instance_banned(community.instance.domain): + return + + flag_id = f"https://{current_app.config['SERVER_NAME']}/activities/flag/{gibberish(15)}" + to = [community.public_url()] + flag = { + 'id': flag_id, + 'type': 'Flag', + 'actor': user.public_url(), + 'object': object.public_url(), + '@context': default_context(), + 'audience': community.public_url(), + 'to': to, + 'summary': summary + } + + post_request(community.ap_inbox_url, flag, user.private_key, user.public_url() + '#main-key') + + diff --git a/app/shared/tasks/follows.py b/app/shared/tasks/follows.py new file mode 100644 index 00000000..97eafc8d --- /dev/null +++ b/app/shared/tasks/follows.py @@ -0,0 +1,165 @@ +from app import cache, celery, db +from app.activitypub.signature import default_context, post_request +from app.models import Community, CommunityBan, CommunityJoinRequest, User +from app.utils import community_membership, gibberish, joined_communities, instance_banned + +from flask import current_app, flash +from flask_babel import _ + + +# would be in app/constants.py +SRC_WEB = 1 +SRC_PUB = 2 +SRC_API = 3 +SRC_PLD = 4 + + +""" JSON format +{ + 'id': + 'type': + 'actor': + 'object': + '@context': (outer object only) + 'to': [] +} +""" + +""" +async: + delete memoized + add or delete community_join_request + used for admin preload in production (return values are ignored) + used for API +sync: + add or delete community_member + used for debug mode + used for web users to provide feedback +""" + + +@celery.task +def join_community(send_async, user_id, community_id, src): + user = User.query.filter_by(id=user_id).one() + community = Community.query.filter_by(id=community_id).one() + + pre_load_message = {} + banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community_id).first() + if banned: + if not send_async: + if src == SRC_WEB: + flash(_('You cannot join this community')) + return + elif src == SRC_PLD: + pre_load_message['user_banned'] = True + return pre_load_message + elif src == SRC_API: + raise Exception('banned_from_community') + return + + if (not community.is_local() and + (user.has_blocked_instance(community.instance.id) or + instance_banned(community.instance.domain))): + if not send_async: + if src == SRC_WEB: + flash(_('Community is on banned or blocked instance')) + return + elif src == SRC_PLD: + pre_load_message['community_on_banned_or_blocked_instance'] = True + return pre_load_message + elif src == SRC_API: + raise Exception('community_on_banned_or_blocked_instance') + return + + success = True + if not community.is_local() and community.instance.online(): + join_request = CommunityJoinRequest(user_id=user_id, community_id=community_id) + db.session.add(join_request) + db.session.commit() + + follow_id = f"https://{current_app.config['SERVER_NAME']}/activities/follow/{join_request.id}" + follow = { + 'id': follow_id, + 'type': 'Follow', + 'actor': user.public_url(), + 'object': community.public_url(), + '@context': default_context(), + 'to': [community.public_url()], + } + success = post_request(community.ap_inbox_url, follow, user.private_key, + user.public_url() + '#main-key', timeout=10) + if success is False or isinstance(success, str): + if not send_async: + db.session.query(CommunityJoinRequest).filter_by(user_id=user_id, community_id=community_id).delete() + db.session.commit() + + if 'is not in allowlist' in success: + msg_to_user = f'{community.instance.domain} does not allow us to join their communities.' + else: + msg_to_user = "There was a problem while trying to communicate with remote server. Please try again later." + + if src == SRC_WEB: + flash(_(msg_to_user), 'error') + return + elif src == SRC_PLD: + pre_load_message['status'] = msg_to_user + return pre_load_message + elif src == SRC_API: + raise Exception(msg_to_user) + + # for communities on local or offline instances, joining is instant + if success is True: + cache.delete_memoized(community_membership, user, community) + cache.delete_memoized(joined_communities, user.id) + + if src == SRC_WEB: + flash('You joined ' + community.title) + return + elif src == SRC_PLD: + pre_load_message['status'] = 'joined' + return pre_load_message + + return success + + +@celery.task +def leave_community(send_async, user_id, community_id): + user = User.query.filter_by(id=user_id).one() + community = Community.query.filter_by(id=community_id).one() + + cache.delete_memoized(community_membership, user, community) + cache.delete_memoized(joined_communities, user.id) + + if community.is_local(): + return + + join_request = CommunityJoinRequest.query.filter_by(user_id=user_id, community_id=community_id).one() + db.session.delete(join_request) + db.session.commit() + + if (not community.instance.online() or + user.has_blocked_instance(community.instance.id) or + instance_banned(community.instance.domain)): + return + + follow_id = f"https://{current_app.config['SERVER_NAME']}/activities/follow/{join_request.id}" + follow = { + 'id': follow_id, + 'type': 'Follow', + 'actor': user.public_url(), + 'object': community.public_url(), + 'to': [community.public_url()] + } + undo_id = f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}" + undo = { + 'id': undo_id, + 'type': 'Undo', + 'actor': user.public_url(), + 'object': follow, + '@context': default_context(), + 'to': [community.public_url()] + } + + post_request(community.ap_inbox_url, undo, user.private_key, user.public_url() + '#main-key', timeout=10) + + diff --git a/app/shared/tasks/likes.py b/app/shared/tasks/likes.py new file mode 100644 index 00000000..33e21993 --- /dev/null +++ b/app/shared/tasks/likes.py @@ -0,0 +1,108 @@ +from app import cache, celery +from app.activitypub.signature import default_context, post_request +from app.models import CommunityBan, Post, PostReply, User +from app.utils import gibberish, instance_banned, recently_upvoted_posts, recently_downvoted_posts, recently_upvoted_post_replies, recently_downvoted_post_replies + +from flask import current_app + + +""" JSON format +{ + 'id': + 'type': + 'actor': + 'object': + '@context': (outer object only) + 'audience': (inner object only) + 'to': [] (announce only) + 'cc': [] (announce only) +} +""" + +@celery.task +def vote_for_post(send_async, user_id, post_id, vote_to_undo, vote_direction): + post = Post.query.filter_by(id=post_id).one() + cache.delete_memoized(recently_upvoted_posts, user_id) + cache.delete_memoized(recently_downvoted_posts, user_id) + send_vote(user_id, post, vote_to_undo, vote_direction) + + +@celery.task +def vote_for_reply(send_async, user_id, reply_id, vote_to_undo, vote_direction): + reply = PostReply.query.filter_by(id=reply_id).one() + cache.delete_memoized(recently_upvoted_post_replies, user_id) + cache.delete_memoized(recently_downvoted_post_replies, user_id) + send_vote(user_id, reply, vote_to_undo, vote_direction) + + +def send_vote(user_id, object, vote_to_undo, vote_direction): + user = User.query.filter_by(id=user_id).one() + community = object.community + if community.local_only or not community.instance.online(): + return + + banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community.id).first() + if banned: + return + if not community.is_local(): + if user.has_blocked_instance(community.instance.id) or instance_banned(community.instance.domain): + return + + if vote_to_undo: + type=vote_to_undo + else: + type = 'Like' if vote_direction == 'upvote' else 'Dislike' + vote_id = f"https://{current_app.config['SERVER_NAME']}/activities/{type.lower()}/{gibberish(15)}" + actor = user.public_url(not(community.instance.votes_are_public() and user.vote_privately())) + + vote = { + 'id': vote_id, + 'type': type, + 'actor': actor, + 'object': object.public_url(), + '@context': default_context(), + 'audience': community.public_url() + } + + if vote_to_undo: + del vote['@context'] + undo_id = f"https://{current_app.config['SERVER_NAME']}/activities/undo/{gibberish(15)}" + undo = { + 'id': undo_id, + 'type': 'Undo', + 'actor': actor, + 'object': vote, + '@context': default_context(), + 'audience': community.public_url() + } + + if community.is_local(): + if vote_to_undo: + del undo['@context'] + object=undo + else: + del vote['@context'] + object=vote + + announce_id = f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}" + actor = community.public_url() + to = ["https://www.w3.org/ns/activitystreams#Public"] + cc = [community.ap_followers_url] + announce = { + 'id': announce_id, + 'type': 'Announce', + 'actor': actor, + 'object': object, + '@context': default_context(), + 'to': to, + 'cc': cc + } + for instance in community.following_instances(): + if instance.inbox and instance.online() and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): + post_request(instance.inbox, announce, community.private_key, community.public_url() + '#main-key') + else: + payload = undo if vote_to_undo else vote + post_request(community.ap_inbox_url, payload, user.private_key, + user.public_url(not(community.instance.votes_are_public() and user.vote_privately())) + '#main-key') + + diff --git a/app/shared/tasks/notes.py b/app/shared/tasks/notes.py new file mode 100644 index 00000000..006c6cf0 --- /dev/null +++ b/app/shared/tasks/notes.py @@ -0,0 +1,205 @@ +from app import cache, celery, db +from app.activitypub.signature import default_context, post_request +from app.models import Community, CommunityBan, CommunityJoinRequest, CommunityMember, Notification, Post, PostReply, User, utcnow +from app.user.utils import search_for_user +from app.utils import community_membership, gibberish, joined_communities, instance_banned, ap_datetime, \ + recently_upvoted_posts, recently_downvoted_posts, recently_upvoted_post_replies, recently_downvoted_post_replies + +from flask import current_app +from flask_babel import _ + +import re + + +""" Reply JSON format +{ + 'id': + 'url': + 'type': + 'attributedTo': + 'to': [] + 'cc': [] + 'tag': [] + 'audience': + 'content': + 'mediaType': + 'source': {} + 'inReplyTo': + 'published': + 'updated': (inner oject of Update only) + 'language': {} + 'contentMap':{} + 'distinguished' +} +""" +""" Create / Update / Announce JSON format +{ + 'id': + 'type': + 'actor': + 'object': + 'to': [] + 'cc': [] + '@context': (outer object only) + 'audience': (not in Announce) + 'tag': [] (not in Announce) +} +""" + + + +@celery.task +def make_reply(send_async, user_id, reply_id, parent_id): + send_reply(user_id, reply_id, parent_id) + + +@celery.task +def edit_reply(send_async, user_id, reply_id, parent_id): + send_reply(user_id, reply_id, parent_id, edit=True) + + +def send_reply(user_id, reply_id, parent_id, edit=False): + user = User.query.filter_by(id=user_id).one() + reply = PostReply.query.filter_by(id=reply_id).one() + if parent_id: + parent = PostReply.query.filter_by(id=parent_id).one() + else: + parent = reply.post + community = reply.community + + recipients = [parent.author] + pattern = r"@([a-zA-Z0-9_.-]*)@([a-zA-Z0-9_.-]*)\b" + matches = re.finditer(pattern, reply.body) + for match in matches: + recipient = None + if match.group(2) == current_app.config['SERVER_NAME']: + user_name = match.group(1) + try: + recipient = search_for_user(user_name) + except: + pass + else: + ap_id = f"{match.group(1)}@{match.group(2)}" + try: + recipient = search_for_user(ap_id) + except: + pass + if recipient: + add_recipient = True + for existing_recipient in recipients: + if ((not recipient.ap_id and recipient.user_name == existing_recipient.user_name) or + (recipient.ap_id and recipient.ap_id == existing_recipient.ap_id)): + add_recipient = False + break + if add_recipient: + recipients.append(recipient) + + if community.local_only: + for recipient in recipients: + if recipient.is_local() and recipient.id != parent.author.id: + already_notified = cache.get(f'{recipient.id} notified of {reply.id}') + if not already_notified: + cache.set(f'{recipient.id} notified of {reply.id}', True, timeout=86400) + notification = Notification(user_id=recipient.id, title=_('You have been mentioned in a comment'), + url=f"https://{current_app.config['SERVER_NAME']}/comment/{reply.id}", + author_id=user.id) + recipient.unread_notifications += 1 + db.session.add(notification) + db.session.commit() + + if community.local_only or not community.instance.online(): + return + + banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community.id).first() + if banned: + return + if not community.is_local(): + if user.has_blocked_instance(community.instance.id) or instance_banned(community.instance.domain): + return + + to = ["https://www.w3.org/ns/activitystreams#Public"] + cc = [community.public_url()] + tag = [] + for recipient in recipients: + tag.append({'href': recipient.public_url(), 'name': recipient.mention_tag(), 'type': 'Mention'}) + cc.append(recipient.public_url()) + language = {'identifier': reply.language_code(), 'name': reply.language_name()} + content_map = {reply.language_code(): reply.body_html} + source = {'content': reply.body, 'mediaType': 'text/markdown'} + note = { + 'id': reply.public_url(), + 'url': reply.public_url(), + 'type': 'Note', + 'attributedTo': user.public_url(), + 'to': to, + 'cc': cc, + 'tag': tag, + 'audience': community.public_url(), + 'content': reply.body_html, + 'mediaType': 'text/html', + 'source': source, + 'inReplyTo': parent.public_url(), + 'published': ap_datetime(reply.posted_at), + 'language': language, + 'contentMap': content_map, + 'distinguished': False, + } + if edit: + note['updated']: ap_datetime(utcnow()) + + activity = 'create' if not edit else 'update' + create_id = f"https://{current_app.config['SERVER_NAME']}/activities/{activity}/{gibberish(15)}" + type = 'Create' if not edit else 'Update' + create = { + 'id': create_id, + 'type': type, + 'actor': user.public_url(), + 'object': note, + 'to': to, + 'cc': cc, + '@context': default_context(), + 'tag': tag + } + + domains_sent_to = [current_app.config['SERVER_NAME']] + + if community.is_local(): + del create['@context'] + + announce_id = f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}" + actor = community.public_url() + cc = [community.ap_followers_url] + announce = { + 'id': announce_id, + 'type': 'Announce', + 'actor': community.public_url(), + 'object': create, + 'to': to, + 'cc': cc, + '@context': default_context() + } + for instance in community.following_instances(): + if instance.inbox and instance.online() and not user.has_blocked_instance(instance.id) and not instance_banned(instance.domain): + post_request(instance.inbox, announce, community.private_key, community.public_url() + '#main-key') + domains_sent_to.append(instance.domain) + else: + post_request(community.ap_inbox_url, create, user.private_key, user.public_url() + '#main-key') + domains_sent_to.append(community.instance.domain) + + # send copy to anyone else Mentioned in reply. (mostly for other local users and users on microblog sites) + for recipient in recipients: + if recipient.instance.domain not in domains_sent_to: + post_request(recipient.instance.inbox, create, user.private_key, user.public_url() + '#main-key') + if recipient.is_local() and recipient.id != parent.author.id: + already_notified = cache.get(f'{recipient.id} notified of {reply.id}') + if not already_notified: + cache.set(f'{recipient.id} notified of {reply.id}', True, timeout=86400) + notification = Notification(user_id=recipient.id, title=_('You have been mentioned in a comment'), + url=f"https://{current_app.config['SERVER_NAME']}/comment/{reply.id}", + author_id=user.id) + recipient.unread_notifications += 1 + db.session.add(notification) + db.session.commit() + + + diff --git a/app/user/utils.py b/app/user/utils.py index 50bbe305..6bf77760 100644 --- a/app/user/utils.py +++ b/app/user/utils.py @@ -113,6 +113,8 @@ def unsubscribe_from_community(community, user): def search_for_user(address: str): + if address.startswith('@'): + address = address[1:] if '@' in address: name, server = address.lower().split('@') else: @@ -126,14 +128,17 @@ def search_for_user(address: str): raise Exception(f"{server} is blocked.{reason}") already_exists = User.query.filter_by(ap_id=address).first() else: - already_exists = User.query.filter_by(user_name=name).first() + already_exists = User.query.filter_by(user_name=name, ap_id=None).first() if already_exists: return already_exists + if not server: + return None + # Look up the profile address of the user using WebFinger # todo: try, except block around every get_request webfinger_data = get_request(f"https://{server}/.well-known/webfinger", - params={'resource': f"acct:{address[1:]}"}) + params={'resource': f"acct:{address}"}) if webfinger_data.status_code == 200: webfinger_json = webfinger_data.json() for links in webfinger_json['links']: