diff --git a/app/activitypub/util.py b/app/activitypub/util.py index 06ff7843..5a53f6b2 100644 --- a/app/activitypub/util.py +++ b/app/activitypub/util.py @@ -2400,112 +2400,92 @@ def resolve_remote_post(uri: str, community_id: int, announce_actor=None, store_ return None +# called from UI, via 'search' option in navbar def resolve_remote_post_from_search(uri: str) -> Union[Post, None]: post = Post.query.filter_by(ap_id=uri).first() if post: return post - site = Site.query.get(1) - parsed_url = urlparse(uri) uri_domain = parsed_url.netloc actor_domain = None actor = None - post_request = get_request(uri, headers={'Accept': 'application/activity+json'}) - if post_request.status_code == 200: - post_data = post_request.json() - post_request.close() - # check again that it doesn't already exist (can happen with different but equivalent URLs) - post = Post.query.filter_by(ap_id=post_data['id']).first() - if post: - return post - # find the author of the post. Make sure their domain matches the site hosting it to mitigate impersonation attempts - if 'attributedTo' in post_data: - attributed_to = post_data['attributedTo'] - if isinstance(attributed_to, str): - actor = attributed_to - parsed_url = urlparse(actor) - actor_domain = parsed_url.netloc - elif isinstance(attributed_to, list): - for a in attributed_to: - if isinstance(a, dict) and a.get('type') == 'Person': - actor = a.get('id') - if isinstance(actor, str): # Ensure `actor` is a valid string - parsed_url = urlparse(actor) - actor_domain = parsed_url.netloc - break - elif isinstance(a, str): - actor = a + try: + object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) + except httpx.HTTPError: + time.sleep(3) + try: + object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) + except httpx.HTTPError: + return None + if object_request.status_code == 200: + try: + post_data = object_request.json() + except: + object_request.close() + return None + object_request.close() + elif object_request.status_code == 401: + try: + site = Site.query.get(1) + object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") + except httpx.HTTPError: + time.sleep(3) + try: + object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") + except httpx.HTTPError: + return None + try: + post_data = object_request.json() + except: + object_request.close() + return None + object_request.close() + else: + return None + + # check again that it doesn't already exist (can happen with different but equivalent URLs) + post = Post.query.filter_by(ap_id=post_data['id']).first() + if post: + return post + + # find the author of the post. Make sure their domain matches the site hosting it to mitigate impersonation attempts + if 'attributedTo' in post_data: + attributed_to = post_data['attributedTo'] + if isinstance(attributed_to, str): + actor = attributed_to + parsed_url = urlparse(actor) + actor_domain = parsed_url.netloc + elif isinstance(attributed_to, list): + for a in attributed_to: + if isinstance(a, dict) and a.get('type') == 'Person': + actor = a.get('id') + if isinstance(actor, str): # Ensure `actor` is a valid string parsed_url = urlparse(actor) actor_domain = parsed_url.netloc - break - if uri_domain != actor_domain: - return None + break + elif isinstance(a, str): + actor = a + parsed_url = urlparse(actor) + actor_domain = parsed_url.netloc + break + if uri_domain != actor_domain: + return None - # find the community the post was submitted to - community = None - if not community and post_data['type'] == 'Page': # lemmy - if 'audience' in post_data: - community_id = post_data['audience'] - community = find_actor_or_create(community_id, community_only=True) - - if not community and post_data['type'] == 'Video': # peertube - if 'attributedTo' in post_data and isinstance(post_data['attributedTo'], list): - for a in post_data['attributedTo']: - if a['type'] == 'Group': - community_id = a['id'] - community = find_actor_or_create(community_id, community_only=True) - if community: - break - - if not community: # mastodon, etc - if 'inReplyTo' not in post_data or post_data['inReplyTo'] != None: - return None - - if not community and 'to' in post_data and isinstance(post_data['to'], str): - community_id = post_data['to'].lower() - if not community_id == 'https://www.w3.org/ns/activitystreams#Public' and not community_id.endswith('/followers'): - community = Community.query.filter_by(ap_profile_id=community_id).first() - if not community and 'cc' in post_data and isinstance(post_data['cc'], str): - community_id = post_data['cc'].lower() - if not community_id == 'https://www.w3.org/ns/activitystreams#Public' and not community_id.endswith('/followers'): - community = Community.query.filter_by(ap_profile_id=community_id).first() - if not community and 'to' in post_data and isinstance(post_data['to'], list): - for t in post_data['to']: - community_id = t.lower() - if not community_id == 'https://www.w3.org/ns/activitystreams#Public' and not community_id.endswith('/followers'): - community = Community.query.filter_by(ap_profile_id=community_id).first() - if community: - break - if not community and 'cc' in post_data and isinstance(post_data['to'], list): - for c in post_data['cc']: - community_id = c.lower() - if not community_id == 'https://www.w3.org/ns/activitystreams#Public' and not community_id.endswith('/followers'): - community = Community.query.filter_by(ap_profile_id=community_id).first() - if community: - break - - if not community: - return None - - activity_log = ActivityPubLog(direction='in', activity_id=post_data['id'], activity_type='Resolve Post', result='failure') - if site.log_activitypub_json: - activity_log.activity_json = json.dumps(post_data) - db.session.add(activity_log) - user = find_actor_or_create(actor) - if user and community and post_data: - request_json = { - 'id': f"https://{uri_domain}/activities/create/gibberish(15)", - 'object': post_data - } - post = create_post(activity_log, community, request_json, user) - if post: - if 'published' in post_data: - post.posted_at=post_data['published'] - post.last_active=post_data['published'] - db.session.commit() - return post + # find the community the post was submitted to + community = find_community(post_data) + # find the post's author + user = find_actor_or_create(actor) + if user and community and post_data: + request_json = {'id': f"https://{uri_domain}/activities/create/gibberish(15)", 'object': post_data} + post = create_post(False, community, request_json, user) + if post: + if 'published' in post_data: + post.posted_at=post_data['published'] + post.last_active=post_data['published'] + db.session.commit() + return post return None