from datetime import datetime, timedelta from time import sleep from random import randint from typing import List import httpx from PIL import Image, ImageOps from flask import request, abort, g, current_app, json from flask_login import current_user from pillow_heif import register_heif_opener from app import db, cache, celery from app.activitypub.signature import post_request, default_context, signed_get_request from app.activitypub.util import find_actor_or_create, actor_json_to_model, post_json_to_model, ensure_domains_match, \ find_hashtag_or_create, create_post from app.constants import POST_TYPE_ARTICLE, POST_TYPE_LINK, POST_TYPE_IMAGE, POST_TYPE_VIDEO, NOTIF_POST, \ POST_TYPE_POLL from app.models import Community, File, BannedInstances, PostReply, Post, utcnow, CommunityMember, Site, \ Instance, Notification, User, ActivityPubLog, NotificationSubscription, PollChoice, Poll, Tag from app.utils import get_request, gibberish, markdown_to_html, domain_from_url, \ is_image_url, ensure_directory_exists, shorten_string, \ remove_tracking_from_link, ap_datetime, instance_banned, blocked_phrases, url_to_thumbnail_file, opengraph_parse, \ piefed_markdown_to_lemmy_markdown, get_task_session from sqlalchemy import func, desc, text import os allowed_extensions = ['.gif', '.jpg', '.jpeg', '.png', '.webp', '.heic', '.mpo', '.avif', '.svg'] def search_for_community(address: str): if address.startswith('!'): name, server = address[1:].split('@') banned = BannedInstances.query.filter_by(domain=server).first() if banned: reason = f" Reason: {banned.reason}" if banned.reason is not None else '' raise Exception(f"{server} is blocked.{reason}") # todo: create custom exception class hierarchy if current_app.config['SERVER_NAME'] == server: already_exists = Community.query.filter_by(name=name, ap_id=None).first() return already_exists already_exists = Community.query.filter_by(ap_id=address[1:]).first() if already_exists: return already_exists # Look up the profile address of the community using WebFinger try: webfinger_data = get_request(f"https://{server}/.well-known/webfinger", params={'resource': f"acct:{address[1:]}"}) except httpx.HTTPError: sleep(randint(3, 10)) try: webfinger_data = get_request(f"https://{server}/.well-known/webfinger", params={'resource': f"acct:{address[1:]}"}) except httpx.HTTPError: return None if webfinger_data.status_code == 200: webfinger_json = webfinger_data.json() for links in webfinger_json['links']: if 'rel' in links and links['rel'] == 'self': # this contains the URL of the activitypub profile type = links['type'] if 'type' in links else 'application/activity+json' # retrieve the activitypub profile community_data = get_request(links['href'], headers={'Accept': type}) # to see the structure of the json contained in community_data, do a GET to https://lemmy.world/c/technology with header Accept: application/activity+json if community_data.status_code == 200: community_json = community_data.json() community_data.close() if community_json['type'] == 'Group': community = actor_json_to_model(community_json, name, server) if community: if current_app.debug: retrieve_mods_and_backfill(community.id, server, name, community_json) else: retrieve_mods_and_backfill.delay(community.id, server, name, community_json) return community return None def remote_object_to_json(uri): try: object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) except httpx.HTTPError: time.sleep(3) try: object_request = get_request(uri, headers={'Accept': 'application/activity+json'}) except httpx.HTTPError: return None if object_request.status_code == 200: try: object = object_request.json() return object except: object_request.close() return None object_request.close() elif object_request.status_code == 401: try: site = Site.query.get(1) object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") except httpx.HTTPError: time.sleep(3) try: object_request = signed_get_request(uri, site.private_key, f"https://{current_app.config['SERVER_NAME']}/actor#main-key") except httpx.HTTPError: return None try: object = object_request.json() return object except: object_request.close() return None object_request.close() else: return None @celery.task def retrieve_mods_and_backfill(community_id: int, server, name, community_json=None): with current_app.app_context(): community = Community.query.get(community_id) if not community: return site = Site.query.get(1) is_peertube = is_guppe = is_wordpress = False if community.ap_profile_id == f"https://{server}/video-channels/{name}": is_peertube = True elif community.ap_profile_id.startswith('https://a.gup.pe/u'): is_guppe = True # get mods if community_json and 'attributedTo' in community_json: mods = community_json['attributedTo'] if isinstance(mods, list): for m in mods: if 'type' in m and m['type'] == 'Person' and 'id' in m: mod = find_actor_or_create(m['id']) if mod: existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=mod.id).first() if existing_membership: existing_membership.is_moderator = True else: new_membership = CommunityMember(community_id=community.id, user_id=mod.id, is_moderator=True) db.session.add(new_membership) elif community.ap_moderators_url: mods_data = remote_object_to_json(community.ap_moderators_url) if mods_data and mods_data['type'] == 'OrderedCollection' and 'orderedItems' in mods_data: for actor in mods_data['orderedItems']: sleep(0.5) mod = find_actor_or_create(actor) if mod: existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=mod.id).first() if existing_membership: existing_membership.is_moderator = True else: new_membership = CommunityMember(community_id=community.id, user_id=mod.id, is_moderator=True) db.session.add(new_membership) if is_peertube: community.restricted_to_mods = True db.session.commit() # only backfill nsfw if nsfw communities are allowed if (community.nsfw and not site.enable_nsfw) or (community.nsfl and not site.enable_nsfl): return # download 50 old posts from unpaginated outboxes or 10 posts from page 1 if outbox is paginated (with Celery, or just 2 without) if community.ap_outbox_url: outbox_data = remote_object_to_json(community.ap_outbox_url) if not outbox_data or ('totalItems' in outbox_data and outbox_data['totalItems'] == 0): return if 'first' in outbox_data: outbox_data = remote_object_to_json(outbox_data['first']) if not outbox_data: return max = 10 else: max = 50 if current_app.debug: max = 2 if 'type' in outbox_data and (outbox_data['type'] == 'OrderedCollection' or outbox_data['type'] == 'OrderedCollectionPage') and 'orderedItems' in outbox_data: activities_processed = 0 for announce in outbox_data['orderedItems']: activity = None if is_peertube or is_guppe: activity = remote_object_to_json(announce['object']) elif 'object' in announce and 'object' in announce['object']: activity = announce['object']['object'] elif 'type' in announce and announce['type'] == 'Create': activity = announce['object'] is_wordpress = True if not activity: return if not ensure_domains_match(activity): continue if is_peertube: user = mod elif 'attributedTo' in activity and isinstance(activity['attributedTo'], str): user = find_actor_or_create(activity['attributedTo']) if not user: continue else: continue if user.is_local(): continue if is_peertube or is_guppe: request_json = {'id': f"https://{server}/activities/create/{gibberish(15)}", 'object': activity} elif is_wordpress: request_json = announce else: request_json = announce['object'] post = create_post(True, community, request_json, user, announce['id']) if post: if 'published' in activity: post.posted_at = activity['published'] post.last_active = activity['published'] db.session.commit() activities_processed += 1 if activities_processed >= max: break if community.post_count > 0: community.last_active = Post.query.filter(Post.community_id == community.id).order_by(desc(Post.posted_at)).first().posted_at db.session.commit() if community.ap_featured_url: featured_data = remote_object_to_json(community.ap_featured_url) if featured_data and 'type' in featured_data and featured_data['type'] == 'OrderedCollection' and 'orderedItems' in featured_data: for item in featured_data['orderedItems']: featured_id = item['id'] p = Post.query.filter(Post.ap_id == featured_id).first() if p: p.sticky = True db.session.commit() def actor_to_community(actor) -> Community: actor = actor.strip() if '@' in actor: community = Community.query.filter_by(banned=False, ap_id=actor).first() else: community = Community.query.filter(func.lower(Community.name) == func.lower(actor)).filter_by(banned=False, ap_id=None).first() return community def end_poll_date(end_choice): delta_mapping = { '30m': timedelta(minutes=30), '1h': timedelta(hours=1), '6h': timedelta(hours=6), '12h': timedelta(hours=12), '1d': timedelta(days=1), '3d': timedelta(days=3), '7d': timedelta(days=7) } if end_choice in delta_mapping: return datetime.utcnow() + delta_mapping[end_choice] else: raise ValueError("Invalid choice") def tags_from_string(tags: str) -> List[dict]: return_value = [] tags = tags.strip() if tags == '': return [] tag_list = tags.split(',') tag_list = [tag.strip() for tag in tag_list] for tag in tag_list: if tag[0] == '#': tag = tag[1:] tag_to_append = find_hashtag_or_create(tag) if tag_to_append: return_value.append({'type': 'Hashtag', 'name': tag_to_append.name}) return return_value def tags_from_string_old(tags: str) -> List[Tag]: return_value = [] tags = tags.strip() if tags == '': return [] tag_list = tags.split(',') tag_list = [tag.strip() for tag in tag_list] for tag in tag_list: if tag[0] == '#': tag = tag[1:] tag_to_append = find_hashtag_or_create(tag) if tag_to_append: return_value.append(tag_to_append) return return_value def delete_post_from_community(post_id): if current_app.debug: delete_post_from_community_task(post_id) else: delete_post_from_community_task.delay(post_id) @celery.task def delete_post_from_community_task(post_id): post = Post.query.get(post_id) community = post.community post.deleted = True post.deleted_by = current_user.id db.session.commit() if not community.local_only: delete_json = { 'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}", 'type': 'Delete', 'actor': current_user.public_url(), 'audience': post.community.public_url(), 'to': [post.community.public_url(), 'https://www.w3.org/ns/activitystreams#Public'], 'published': ap_datetime(utcnow()), 'cc': [ current_user.followers_url() ], 'object': post.ap_id, } if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it success = post_request(post.community.ap_inbox_url, delete_json, current_user.private_key, current_user.public_url() + '#main-key') else: # local community - send it to followers on remote instances announce = { "id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", "type": 'Announce', "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "actor": post.community.ap_profile_id, "cc": [ post.community.ap_followers_url ], '@context': default_context(), 'object': delete_json } for instance in post.community.following_instances(): if instance.inbox and not current_user.has_blocked_instance(instance.id) and not instance_banned( instance.domain): send_to_remote_instance(instance.id, post.community.id, announce) def delete_post_reply_from_community(post_reply_id): if current_app.debug: delete_post_reply_from_community_task(post_reply_id) else: delete_post_reply_from_community_task.delay(post_reply_id) @celery.task def delete_post_reply_from_community_task(post_reply_id): post_reply = PostReply.query.get(post_reply_id) post = post_reply.post community = post.community if post_reply.user_id == current_user.id or community.is_moderator(): post_reply.deleted = True post_reply.deleted_by = current_user.id db.session.commit() # federate delete if not post.community.local_only: delete_json = { 'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}", 'type': 'Delete', 'actor': current_user.public_url(), 'audience': post.community.public_url(), 'to': [post.community.public_url(), 'https://www.w3.org/ns/activitystreams#Public'], 'published': ap_datetime(utcnow()), 'cc': [ current_user.followers_url() ], 'object': post_reply.ap_id, } if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it success = post_request(post.community.ap_inbox_url, delete_json, current_user.private_key, current_user.public_url() + '#main-key') else: # local community - send it to followers on remote instances announce = { "id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}", "type": 'Announce', "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "actor": post.community.ap_profile_id, "cc": [ post.community.ap_followers_url ], '@context': default_context(), 'object': delete_json } for instance in post.community.following_instances(): if instance.inbox and not current_user.has_blocked_instance(instance.id) and not instance_banned( instance.domain): send_to_remote_instance(instance.id, post.community.id, announce) def remove_old_file(file_id): remove_file = File.query.get(file_id) remove_file.delete_from_disk() def save_icon_file(icon_file, directory='communities') -> File: # check if this is an allowed type of file file_ext = os.path.splitext(icon_file.filename)[1] if file_ext.lower() not in allowed_extensions: abort(400) new_filename = gibberish(15) # set up the storage directory directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) # save the file final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') icon_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() elif file_ext.lower() == '.avif': import pillow_avif # resize if necessary if file_ext.lower() in allowed_extensions: if file_ext.lower() == '.svg': # svgs don't need to be resized file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} icon', thumbnail_path=final_place) db.session.add(file) return file else: Image.MAX_IMAGE_PIXELS = 89478485 img = Image.open(final_place) img = ImageOps.exif_transpose(img) img_width = img.width img_height = img.height if img.width > 250 or img.height > 250: img.thumbnail((250, 250)) img.save(final_place) img_width = img.width img_height = img.height # save a second, smaller, version as a thumbnail img.thumbnail((40, 40)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} icon', width=img_width, height=img_height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail) db.session.add(file) return file else: abort(400) def save_banner_file(banner_file, directory='communities') -> File: # check if this is an allowed type of file file_ext = os.path.splitext(banner_file.filename)[1] if file_ext.lower() not in allowed_extensions: abort(400) new_filename = gibberish(15) # set up the storage directory directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) # save the file final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') banner_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() elif file_ext.lower() == '.avif': import pillow_avif # resize if necessary Image.MAX_IMAGE_PIXELS = 89478485 img = Image.open(final_place) if '.' + img.format.lower() in allowed_extensions: img = ImageOps.exif_transpose(img) img_width = img.width img_height = img.height if img.width > 1600 or img.height > 600: img.thumbnail((1600, 600)) img.save(final_place) img_width = img.width img_height = img.height # save a second, smaller, version as a thumbnail img.thumbnail((878, 500)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} banner', width=img_width, height=img_height, thumbnail_path=final_place_thumbnail, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height) db.session.add(file) return file else: abort(400) # NB this always signs POSTs as the community so is only suitable for Announce activities def send_to_remote_instance(instance_id: int, community_id: int, payload): if current_app.debug: send_to_remote_instance_task(instance_id, community_id, payload) else: send_to_remote_instance_task.delay(instance_id, community_id, payload) @celery.task def send_to_remote_instance_task(instance_id: int, community_id: int, payload): session = get_task_session() community: Community = session.query(Community).get(community_id) if community: instance: Instance = session.query(Instance).get(instance_id) if instance.inbox and instance.online() and not instance_banned(instance.domain): if post_request(instance.inbox, payload, community.private_key, community.ap_profile_id + '#main-key', timeout=10) is True: instance.last_successful_send = utcnow() instance.failures = 0 else: instance.failures += 1 instance.most_recent_attempt = utcnow() instance.start_trying_again = utcnow() + timedelta(seconds=instance.failures ** 4) if instance.failures > 10: instance.dormant = True session.commit() session.close() def community_in_list(community_id, community_list): for tup in community_list: if community_id == tup[0]: return True return False def find_local_users(search: str) -> List[User]: return User.query.filter(User.banned == False, User.deleted == False, User.ap_id == None, User.user_name.ilike(f"%{search}%")).\ order_by(desc(User.reputation)).all()