From 1654dcea0b7b6374039ce81548bc2b667646a15c Mon Sep 17 00:00:00 2001 From: freamon Date: Sun, 19 Jan 2025 19:31:31 +0000 Subject: [PATCH] Consolidate retrieve_mods_and_backfill() functions to support outbox processing for lemmy, peertube, and wordpress --- app/community/util.py | 270 ++++++++++++++++++++++-------------------- 1 file changed, 139 insertions(+), 131 deletions(-) diff --git a/app/community/util.py b/app/community/util.py index 11f54780..50e23023 100644 --- a/app/community/util.py +++ b/app/community/util.py @@ -10,9 +10,9 @@ from flask_login import current_user from pillow_heif import register_heif_opener from app import db, cache, celery -from app.activitypub.signature import post_request, default_context +from app.activitypub.signature import post_request, default_context, signed_get_request from app.activitypub.util import find_actor_or_create, actor_json_to_model, post_json_to_model, ensure_domains_match, \ - find_hashtag_or_create + find_hashtag_or_create, create_post from app.constants import POST_TYPE_ARTICLE, POST_TYPE_LINK, POST_TYPE_IMAGE, POST_TYPE_VIDEO, NOTIF_POST, \ POST_TYPE_POLL from app.models import Community, File, BannedInstances, PostReply, Post, utcnow, CommunityMember, Site, \ @@ -71,159 +71,167 @@ def search_for_community(address: str): if community_json['type'] == 'Group': community = actor_json_to_model(community_json, name, server) if community: - if community.ap_profile_id == f"https://{server}/video-channels/{name}": - if current_app.debug: - retrieve_peertube_mods_and_backfill(community.id, community_json['attributedTo']) - else: - retrieve_peertube_mods_and_backfill.delay(community.id, community_json['attributedTo']) - return community if current_app.debug: - retrieve_mods_and_backfill(community.id) + retrieve_mods_and_backfill(community.id, server, name, community_json) else: - retrieve_mods_and_backfill.delay(community.id) + retrieve_mods_and_backfill.delay(community.id, server, name, community_json) return community return None -@celery.task -def retrieve_peertube_mods_and_backfill(community_id: int, mods: list): - community = Community.query.get(community_id) - site = Site.query.get(1) - for m in mods: - user = find_actor_or_create(m['id']) - if user: - existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first() - if existing_membership: - existing_membership.is_moderator = True - else: - new_membership = CommunityMember(community_id=community.id, user_id=user.id, is_moderator=True) - db.session.add(new_membership) - community.restricted_to_mods = True - db.session.commit() - - if community.ap_public_url: - outbox_request = get_request(community.ap_outbox_url, headers={'Accept': 'application/activity+json'}) - if outbox_request.status_code == 200: - outbox_data = outbox_request.json() - outbox_request.close() - if 'totalItems' in outbox_data and outbox_data['totalItems'] > 0: - page1_request = get_request(outbox_data['first'], headers={'Accept': 'application/activity+json'}) - if page1_request.status_code == 200: - page1_data = page1_request.json() - page1_request.close() - if 'type' in page1_data and page1_data['type'] == 'OrderedCollectionPage' and 'orderedItems' in page1_data: - # only 10 posts per page for PeerTube - for activity in page1_data['orderedItems']: - video_request = get_request(activity['object'], headers={'Accept': 'application/activity+json'}) - if video_request.status_code == 200: - video_data = video_request.json() - video_request.close() - activity_log = ActivityPubLog(direction='in', activity_id=video_data['id'], activity_type='Video', result='failure') - if site.log_activitypub_json: - activity_log.activity_json = json.dumps(video_data) - db.session.add(activity_log) - if not ensure_domains_match(video_data): - activity_log.exception_message = 'Domains do not match' - db.session.commit() - continue - if user and user.is_local(): - activity_log.exception_message = 'Activity about local content which is already present' - db.session.commit() - continue - if user: - post = post_json_to_model(activity_log, video_data, user, community) - post.ap_announce_id = activity['id'] - post.ranking = post.post_ranking(post.score, post.posted_at) - else: - activity_log.exception_message = 'Could not find or create actor' - db.session.commit() - if community.post_count > 0: - community.last_active = Post.query.filter(Post.community_id == community_id).order_by(desc(Post.posted_at)).first().posted_at - db.session.commit() +def remote_object_to_json(uri): + try: + object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) + except httpx.HTTPError: + time.sleep(3) + try: + object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) + except httpx.HTTPError: + return None + if object_request.status_code == 200: + try: + object = object_request.json() + return object + except: + object_request.close() + return None + object_request.close() + elif object_request.status_code == 401: + try: + site = Site.query.get(1) + object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") + except httpx.HTTPError: + time.sleep(3) + try: + object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") + except httpx.HTTPError: + return None + try: + object = object_request.json() + return object + except: + object_request.close() + return None + object_request.close() + else: + return None @celery.task -def retrieve_mods_and_backfill(community_id: int): +def retrieve_mods_and_backfill(community_id: int, server, name, community_json=None): with current_app.app_context(): community = Community.query.get(community_id) + if not community: + return site = Site.query.get(1) - if community.ap_moderators_url: - mods_request = get_request(community.ap_moderators_url, headers={'Accept': 'application/activity+json'}) - if mods_request.status_code == 200: - mods_data = mods_request.json() - mods_request.close() - if mods_data and mods_data['type'] == 'OrderedCollection' and 'orderedItems' in mods_data: - for actor in mods_data['orderedItems']: - sleep(0.5) - user = find_actor_or_create(actor) - if user: - existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first() + + is_peertube = is_guppe = is_wordpress = False + if community.ap_profile_id == f"https://{server}/video-channels/{name}": + is_peertube = True + elif community.ap_profile_id.startswith('https://a.gup.pe/u'): + is_guppe = True + + # get mods + if community_json and 'attributedTo' in community_json: + mods = community_json['attributedTo'] + if isinstance(mods, list): + for m in mods: + if 'type' in m and m['type'] == 'Person' and 'id' in m: + mod = find_actor_or_create(m['id']) + if mod: + existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=mod.id).first() if existing_membership: existing_membership.is_moderator = True else: - new_membership = CommunityMember(community_id=community.id, user_id=user.id, is_moderator=True) + new_membership = CommunityMember(community_id=community.id, user_id=mod.id, is_moderator=True) db.session.add(new_membership) - db.session.commit() + elif community.ap_moderators_url: + mods_data = remote_object_to_json(community.ap_moderators_url) + if mods_data and mods_data['type'] == 'OrderedCollection' and 'orderedItems' in mods_data: + for actor in mods_data['orderedItems']: + sleep(0.5) + mod = find_actor_or_create(actor) + if mod: + existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=mod.id).first() + if existing_membership: + existing_membership.is_moderator = True + else: + new_membership = CommunityMember(community_id=community.id, user_id=mod.id, is_moderator=True) + db.session.add(new_membership) + if is_peertube: + community.restricted_to_mods = True + db.session.commit() # only backfill nsfw if nsfw communities are allowed if (community.nsfw and not site.enable_nsfw) or (community.nsfl and not site.enable_nsfl): return - # download 50 old posts + # download 50 old posts from unpaginated outboxes or 10 posts from page 1 if outbox is paginated (with Celery, or just 2 without) if community.ap_outbox_url: - outbox_request = get_request(community.ap_outbox_url, headers={'Accept': 'application/activity+json'}) - if outbox_request.status_code == 200: - outbox_data = outbox_request.json() - outbox_request.close() - if 'type' in outbox_data and outbox_data['type'] == 'OrderedCollection' and 'orderedItems' in outbox_data: - activities_processed = 0 - for activity in outbox_data['orderedItems']: - activity_log = ActivityPubLog(direction='in', activity_id=activity['id'], activity_type='Announce', result='failure') - if site.log_activitypub_json: - activity_log.activity_json = json.dumps(activity) - db.session.add(activity_log) - if 'object' in activity and 'object' in activity['object']: - if not ensure_domains_match(activity['object']['object']): - activity_log.exception_message = 'Domains do not match' - db.session.commit() - continue - user = find_actor_or_create(activity['object']['actor']) - if user and user.is_local(): - activity_log.exception_message = 'Activity about local content which is already present' - db.session.commit() - continue - if user: - post = post_json_to_model(activity_log, activity['object']['object'], user, community) - if post: - post.ap_create_id = activity['object']['id'] - post.ap_announce_id = activity['id'] - post.ranking = post.post_ranking(post.score, post.posted_at) - if post.url: - post.calculate_cross_posts() - db.session.commit() - else: - activity_log.exception_message = 'Could not find or create actor' + outbox_data = remote_object_to_json(community.ap_outbox_url) + if not outbox_data or ('totalItems' in outbox_data and outbox_data['totalItems'] == 0): + return + if 'first' in outbox_data: + outbox_data = remote_object_to_json(outbox_data['first']) + if not outbox_data: + return + max = 10 + else: + max = 50 + if current_app.debug: + max = 2 + if 'type' in outbox_data and (outbox_data['type'] == 'OrderedCollection' or outbox_data['type'] == 'OrderedCollectionPage') and 'orderedItems' in outbox_data: + activities_processed = 0 + for announce in outbox_data['orderedItems']: + activity = None + if is_peertube or is_guppe: + activity = remote_object_to_json(announce['object']) + elif 'object' in announce and 'object' in announce['object']: + activity = announce['object']['object'] + elif 'type' in announce and announce['type'] == 'Create': + activity = announce['object'] + is_wordpress = True + if not activity: + return + if not ensure_domains_match(activity): + continue + if is_peertube: + user = mod + elif 'attributedTo' in activity and isinstance(activity['attributedTo'], str): + user = find_actor_or_create(activity['attributedTo']) + if not user: + continue + else: + continue + if user.is_local(): + continue + if is_peertube or is_guppe: + request_json = {'id': f"https://{server}/activities/create/{gibberish(15)}", 'object': activity} + elif is_wordpress: + request_json = announce + else: + request_json = announce['object'] + post = create_post(True, community, request_json, user, announce['id']) + if post: + if 'published' in activity: + post.posted_at = activity['published'] + post.last_active = activity['published'] db.session.commit() - - activities_processed += 1 - if activities_processed >= 50: - break - c = Community.query.get(community.id) - if c.post_count > 0: - c.last_active = Post.query.filter(Post.community_id == community_id).order_by(desc(Post.posted_at)).first().posted_at + activities_processed += 1 + if activities_processed >= max: + break + if community.post_count > 0: + community.last_active = Post.query.filter(Post.community_id == community.id).order_by(desc(Post.posted_at)).first().posted_at db.session.commit() - if community.ap_featured_url: - featured_request = get_request(community.ap_featured_url, headers={'Accept': 'application/activity+json'}) - if featured_request.status_code == 200: - featured_data = featured_request.json() - featured_request.close() - if featured_data['type'] == 'OrderedCollection' and 'orderedItems' in featured_data: - for item in featured_data['orderedItems']: - featured_id = item['id'] - p = Post.query.filter(Post.ap_id == featured_id).first() - if p: - p.sticky = True - db.session.commit() + if community.ap_featured_url: + featured_data = remote_object_to_json(community.ap_featured_url) + if featured_data and 'type' in featured_data and featured_data['type'] == 'OrderedCollection' and 'orderedItems' in featured_data: + for item in featured_data['orderedItems']: + featured_id = item['id'] + p = Post.query.filter(Post.ap_id == featured_id).first() + if p: + p.sticky = True + db.session.commit() def actor_to_community(actor) -> Community: