from datetime import datetime, timedelta from threading import Thread from time import sleep from typing import List import requests from PIL import Image, ImageOps from flask import request, abort, g, current_app from flask_login import current_user from pillow_heif import register_heif_opener from app import db, cache, celery from app.activitypub.signature import post_request from app.activitypub.util import find_actor_or_create, actor_json_to_model, post_json_to_model from app.constants import POST_TYPE_ARTICLE, POST_TYPE_LINK, POST_TYPE_IMAGE from app.models import Community, File, BannedInstances, PostReply, PostVote, Post, utcnow, CommunityMember, Site, \ Instance from app.utils import get_request, gibberish, markdown_to_html, domain_from_url, validate_image, allowlist_html, \ html_to_markdown, is_image_url, ensure_directory_exists, inbox_domain, post_ranking from sqlalchemy import desc, text import os from opengraph_parse import parse_page allowed_extensions = ['.gif', '.jpg', '.jpeg', '.png', '.webp', '.heic'] def search_for_community(address: str): if address.startswith('!'): name, server = address[1:].split('@') banned = BannedInstances.query.filter_by(domain=server).first() if banned: reason = f" Reason: {banned.reason}" if banned.reason is not None else '' raise Exception(f"{server} is blocked.{reason}") # todo: create custom exception class hierarchy already_exists = Community.query.filter_by(ap_id=address[1:]).first() if already_exists: return already_exists # Look up the profile address of the community using WebFinger # todo: try, except block around every get_request webfinger_data = get_request(f"https://{server}/.well-known/webfinger", params={'resource': f"acct:{address[1:]}"}) if webfinger_data.status_code == 200: webfinger_json = webfinger_data.json() for links in webfinger_json['links']: if 'rel' in links and links['rel'] == 'self': # this contains the URL of the activitypub profile type = links['type'] if 'type' in links else 'application/activity+json' # retrieve the activitypub profile community_data = get_request(links['href'], headers={'Accept': type}) # to see the structure of the json contained in community_data, do a GET to https://lemmy.world/c/technology with header Accept: application/activity+json if community_data.status_code == 200: community_json = community_data.json() community_data.close() if community_json['type'] == 'Group': community = actor_json_to_model(community_json, address, server) if current_app.debug: retrieve_mods_and_backfill(community.id) else: retrieve_mods_and_backfill.delay(community.id) return community return None @celery.task def retrieve_mods_and_backfill(community_id: int): with current_app.app_context(): community = Community.query.get(community_id) site = Site.query.get(1) if community.ap_moderators_url: mods_request = get_request(community.ap_moderators_url, headers={'Accept': 'application/activity+json'}) if mods_request.status_code == 200: mods_data = mods_request.json() mods_request.close() if mods_data and mods_data['type'] == 'OrderedCollection' and 'orderedItems' in mods_data: for actor in mods_data['orderedItems']: sleep(0.5) user = find_actor_or_create(actor) if user: existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first() if existing_membership: existing_membership.is_moderator = True else: new_membership = CommunityMember(community_id=community.id, user_id=user.id, is_moderator=True) db.session.add(new_membership) db.session.commit() # only backfill nsfw if nsfw communities are allowed if (community.nsfw and not site.enable_nsfw) or (community.nsfl and not site.enable_nsfl): return # download 50 old posts if community.ap_public_url: outbox_request = get_request(community.ap_public_url + '/outbox', headers={'Accept': 'application/activity+json'}) if outbox_request.status_code == 200: outbox_data = outbox_request.json() outbox_request.close() if outbox_data['type'] == 'OrderedCollection' and 'orderedItems' in outbox_data: activities_processed = 0 for activity in outbox_data['orderedItems']: user = find_actor_or_create(activity['object']['actor']) if user: post = post_json_to_model(activity['object']['object'], user, community) post.ap_create_id = activity['object']['id'] post.ap_announce_id = activity['id'] post.ranking = post_ranking(post.score, post.posted_at) db.session.commit() activities_processed += 1 if activities_processed >= 50: break c = Community.query.get(community.id) c.post_count = activities_processed c.last_active = utcnow() db.session.commit() def community_url_exists(url) -> bool: community = Community.query.filter_by(ap_profile_id=url).first() return community is not None def actor_to_community(actor) -> Community: actor = actor.strip() if '@' in actor: community = Community.query.filter_by(banned=False, ap_id=actor).first() else: community = Community.query.filter_by(name=actor, banned=False, ap_id=None).first() return community @cache.memoize(timeout=50) def opengraph_parse(url): try: return parse_page(url) except Exception as ex: return None def url_to_thumbnail_file(filename) -> File: unused, file_extension = os.path.splitext(filename) response = requests.get(filename, timeout=5) if response.status_code == 200: new_filename = gibberish(15) directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) final_place = os.path.join(directory, new_filename + file_extension) with open(final_place, 'wb') as f: f.write(response.content) with Image.open(final_place) as img: img = ImageOps.exif_transpose(img) img.thumbnail((150, 150)) img.save(final_place) thumbnail_width = img.width thumbnail_height = img.height return File(file_name=new_filename + file_extension, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, thumbnail_path=final_place, source_url=filename) def save_post(form, post: Post): post.nsfw = form.nsfw.data post.nsfl = form.nsfl.data post.notify_author = form.notify_author.data if form.type.data == '' or form.type.data == 'discussion': post.title = form.discussion_title.data post.body = form.discussion_body.data post.body_html = markdown_to_html(post.body) post.type = POST_TYPE_ARTICLE elif form.type.data == 'link': post.title = form.link_title.data url_changed = post.id is None or form.link_url.data != post.url post.url = form.link_url.data post.type = POST_TYPE_LINK domain = domain_from_url(form.link_url.data) domain.post_count += 1 post.domain = domain if url_changed: if post.image_id: remove_old_file(post.image_id) post.image_id = None valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp'} unused, file_extension = os.path.splitext(form.link_url.data) # do not use _ here instead of 'unused' # this url is a link to an image - generate a thumbnail of it if file_extension in valid_extensions: file = url_to_thumbnail_file(form.link_url.data) if file: post.image = file db.session.add(file) else: # check opengraph tags on the page and make a thumbnail if an image is available in the og:image meta tag opengraph = opengraph_parse(form.link_url.data) if opengraph and opengraph.get('og:image', '') != '': filename = opengraph.get('og:image') valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp'} unused, file_extension = os.path.splitext(filename) if file_extension.lower() in valid_extensions: file = url_to_thumbnail_file(filename) if file: file.alt_text = opengraph.get('og:title') post.image = file db.session.add(file) elif form.type.data == 'image': post.title = form.image_title.data post.type = POST_TYPE_IMAGE uploaded_file = request.files['image_file'] if uploaded_file and uploaded_file.filename != '': if post.image_id: remove_old_file(post.image_id) post.image_id = None # check if this is an allowed type of file file_ext = os.path.splitext(uploaded_file.filename)[1] if file_ext.lower() not in allowed_extensions or file_ext != validate_image( uploaded_file.stream): abort(400) new_filename = gibberish(15) # set up the storage directory directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) # save the file final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') uploaded_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() # resize if necessary img = Image.open(final_place) img = ImageOps.exif_transpose(img) img_width = img.width img_height = img.height if img.width > 2000 or img.height > 2000: img.thumbnail((2000, 2000)) img.save(final_place) img_width = img.width img_height = img.height # save a second, smaller, version as a thumbnail img.thumbnail((256, 256)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=form.image_title.data, width=img_width, height=img_height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail) post.image = file db.session.add(file) elif form.type.data == 'poll': ... else: raise Exception('invalid post type') if post.id is None: if current_user.reputation > 100: postvote = PostVote(user_id=1, author_id=current_user.id, post=post, effect=1.0) post.up_votes = 1 post.score = 1 post.ranking = 1 db.session.add(postvote) if current_user.reputation < -100: postvote = PostVote(user_id=1, author_id=current_user.id, post=post, effect=-1.0) post.score = -1 post.ranking = -1 db.session.add(postvote) post.ranking = post_ranking(post.score, utcnow()) db.session.add(post) g.site.last_active = utcnow() def remove_old_file(file_id): remove_file = File.query.get(file_id) remove_file.delete_from_disk() def save_icon_file(icon_file, directory='communities') -> File: # check if this is an allowed type of file file_ext = os.path.splitext(icon_file.filename)[1] if file_ext.lower() not in allowed_extensions or file_ext != validate_image( icon_file.stream): abort(400) new_filename = gibberish(15) # set up the storage directory directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) # save the file final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') icon_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() # resize if necessary img = Image.open(final_place) img = ImageOps.exif_transpose(img) img_width = img.width img_height = img.height if img.width > 250 or img.height > 250: img.thumbnail((250, 250)) img.save(final_place) img_width = img.width img_height = img.height # save a second, smaller, version as a thumbnail img.thumbnail((40, 40)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} icon', width=img_width, height=img_height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail) db.session.add(file) return file def save_banner_file(banner_file, directory='communities') -> File: # check if this is an allowed type of file file_ext = os.path.splitext(banner_file.filename)[1] if file_ext.lower() not in allowed_extensions or file_ext != validate_image( banner_file.stream): abort(400) new_filename = gibberish(15) # set up the storage directory directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4] ensure_directory_exists(directory) # save the file final_place = os.path.join(directory, new_filename + file_ext) final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp') banner_file.save(final_place) if file_ext.lower() == '.heic': register_heif_opener() # resize if necessary img = Image.open(final_place) img = ImageOps.exif_transpose(img) img_width = img.width img_height = img.height if img.width > 1600 or img.height > 600: img.thumbnail((1600, 600)) img.save(final_place) img_width = img.width img_height = img.height # save a second, smaller, version as a thumbnail img.thumbnail((700, 500)) img.save(final_place_thumbnail, format="WebP", quality=93) thumbnail_width = img.width thumbnail_height = img.height file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} banner', width=img_width, height=img_height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height) db.session.add(file) return file # NB this always signs POSTs as the community so is only suitable for Announce activities def send_to_remote_instance(instance_id: int, community_id: int, payload): if current_app.debug: send_to_remote_instance_task(instance_id, community_id, payload) else: send_to_remote_instance_task.delay(instance_id, community_id, payload) @celery.task def send_to_remote_instance_task(instance_id: int, community_id: int, payload): community = Community.query.get(community_id) if community: instance = Instance.query.get(instance_id) if post_request(instance.inbox, payload, community.private_key, community.ap_profile_id + '#main-key'): instance.last_successful_send = utcnow() instance.failures = 0 else: instance.failures += 1 instance.most_recent_attempt = utcnow() instance.start_trying_again = utcnow() + timedelta(seconds=instance.failures ** 4) if instance.failures > 2: instance.dormant = True db.session.commit()