pyfedi/app/community/util.py

668 lines
30 KiB
Python

from datetime import datetime, timedelta
from threading import Thread
from time import sleep
from typing import List
import requests
from PIL import Image, ImageOps
from flask import request, abort, g, current_app, json
from flask_login import current_user
from pillow_heif import register_heif_opener
from app import db, cache, celery
from app.activitypub.signature import post_request, default_context
from app.activitypub.util import find_actor_or_create, actor_json_to_model, post_json_to_model, ensure_domains_match, \
find_hashtag_or_create
from app.constants import POST_TYPE_ARTICLE, POST_TYPE_LINK, POST_TYPE_IMAGE, POST_TYPE_VIDEO, NOTIF_POST
from app.models import Community, File, BannedInstances, PostReply, PostVote, Post, utcnow, CommunityMember, Site, \
Instance, Notification, User, ActivityPubLog, NotificationSubscription, Language, Tag
from app.utils import get_request, gibberish, markdown_to_html, domain_from_url, allowlist_html, \
is_image_url, ensure_directory_exists, inbox_domain, post_ranking, shorten_string, parse_page, \
remove_tracking_from_link, ap_datetime, instance_banned, blocked_phrases
from sqlalchemy import func, desc, text
import os
allowed_extensions = ['.gif', '.jpg', '.jpeg', '.png', '.webp', '.heic']
def search_for_community(address: str):
if address.startswith('!'):
name, server = address[1:].split('@')
banned = BannedInstances.query.filter_by(domain=server).first()
if banned:
reason = f" Reason: {banned.reason}" if banned.reason is not None else ''
raise Exception(f"{server} is blocked.{reason}") # todo: create custom exception class hierarchy
already_exists = Community.query.filter_by(ap_id=address[1:]).first()
if already_exists:
return already_exists
# Look up the profile address of the community using WebFinger
try:
webfinger_data = get_request(f"https://{server}/.well-known/webfinger",
params={'resource': f"acct:{address[1:]}"})
except requests.exceptions.ReadTimeout:
time.sleep(randint(3, 10))
try:
webfinger_data = get_request(f"https://{server}/.well-known/webfinger",
params={'resource': f"acct:{address[1:]}"})
except requests.exceptions.RequestException:
return None
except requests.exceptions.RequestException:
return None
if webfinger_data.status_code == 200:
webfinger_json = webfinger_data.json()
for links in webfinger_json['links']:
if 'rel' in links and links['rel'] == 'self': # this contains the URL of the activitypub profile
type = links['type'] if 'type' in links else 'application/activity+json'
# retrieve the activitypub profile
community_data = get_request(links['href'], headers={'Accept': type})
# to see the structure of the json contained in community_data, do a GET to https://lemmy.world/c/technology with header Accept: application/activity+json
if community_data.status_code == 200:
community_json = community_data.json()
community_data.close()
if community_json['type'] == 'Group':
community = actor_json_to_model(community_json, name, server)
if community:
if current_app.debug:
retrieve_mods_and_backfill(community.id)
else:
retrieve_mods_and_backfill.delay(community.id)
return community
return None
@celery.task
def retrieve_mods_and_backfill(community_id: int):
with current_app.app_context():
community = Community.query.get(community_id)
site = Site.query.get(1)
if community.ap_moderators_url:
mods_request = get_request(community.ap_moderators_url, headers={'Accept': 'application/activity+json'})
if mods_request.status_code == 200:
mods_data = mods_request.json()
mods_request.close()
if mods_data and mods_data['type'] == 'OrderedCollection' and 'orderedItems' in mods_data:
for actor in mods_data['orderedItems']:
sleep(0.5)
user = find_actor_or_create(actor)
if user:
existing_membership = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first()
if existing_membership:
existing_membership.is_moderator = True
else:
new_membership = CommunityMember(community_id=community.id, user_id=user.id, is_moderator=True)
db.session.add(new_membership)
db.session.commit()
# only backfill nsfw if nsfw communities are allowed
if (community.nsfw and not site.enable_nsfw) or (community.nsfl and not site.enable_nsfl):
return
# download 50 old posts
if community.ap_public_url:
outbox_request = get_request(community.ap_outbox_url, headers={'Accept': 'application/activity+json'})
if outbox_request.status_code == 200:
outbox_data = outbox_request.json()
outbox_request.close()
if 'type' in outbox_data and outbox_data['type'] == 'OrderedCollection' and 'orderedItems' in outbox_data:
activities_processed = 0
for activity in outbox_data['orderedItems']:
activity_log = ActivityPubLog(direction='in', activity_id=activity['id'], activity_type='Announce', result='failure')
if site.log_activitypub_json:
activity_log.activity_json = json.dumps(activity)
db.session.add(activity_log)
if 'object' in activity and 'object' in activity['object']:
if not ensure_domains_match(activity['object']['object']):
activity_log.exception_message = 'Domains do not match'
db.session.commit()
continue
user = find_actor_or_create(activity['object']['actor'])
if user and user.is_local():
activity_log.exception_message = 'Activity about local content which is already present'
db.session.commit()
continue
if user:
post = post_json_to_model(activity_log, activity['object']['object'], user, community)
if post:
post.ap_create_id = activity['object']['id']
post.ap_announce_id = activity['id']
post.ranking = post_ranking(post.score, post.posted_at)
if post.url:
other_posts = Post.query.filter(Post.id != post.id, Post.url == post.url,
Post.posted_at > post.posted_at - timedelta(days=3),
Post.posted_at < post.posted_at + timedelta(days=3)).all()
for op in other_posts:
if op.cross_posts is None:
op.cross_posts = [post.id]
else:
op.cross_posts.append(post.id)
if post.cross_posts is None:
post.cross_posts = [op.id]
else:
post.cross_posts.append(op.id)
db.session.commit()
else:
activity_log.exception_message = 'Could not find or create actor'
db.session.commit()
activities_processed += 1
if activities_processed >= 50:
break
c = Community.query.get(community.id)
if c.post_count > 0:
c.last_active = Post.query.filter(Post.community_id == community_id).order_by(desc(Post.posted_at)).first().posted_at
db.session.commit()
if community.ap_featured_url:
featured_request = get_request(community.ap_featured_url, headers={'Accept': 'application/activity+json'})
if featured_request.status_code == 200:
featured_data = featured_request.json()
featured_request.close()
if featured_data['type'] == 'OrderedCollection' and 'orderedItems' in featured_data:
for item in featured_data['orderedItems']:
featured_id = item['id']
p = Post.query.filter(Post.ap_id == featured_id).first()
if p:
p.sticky = True
db.session.commit()
def actor_to_community(actor) -> Community:
actor = actor.strip()
if '@' in actor:
community = Community.query.filter_by(banned=False, ap_id=actor).first()
else:
community = Community.query.filter(func.lower(Community.name) == func.lower(actor)).filter_by(banned=False, ap_id=None).first()
return community
def opengraph_parse(url):
if '?' in url:
url = url.split('?')
url = url[0]
try:
return parse_page(url)
except Exception as ex:
return None
def url_to_thumbnail_file(filename) -> File:
filename_for_extension = filename.split('?')[0] if '?' in filename else filename
unused, file_extension = os.path.splitext(filename_for_extension)
response = requests.get(filename, timeout=5)
if response.status_code == 200:
new_filename = gibberish(15)
directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4]
ensure_directory_exists(directory)
final_place = os.path.join(directory, new_filename + file_extension)
with open(final_place, 'wb') as f:
f.write(response.content)
response.close()
Image.MAX_IMAGE_PIXELS = 89478485
with Image.open(final_place) as img:
img = ImageOps.exif_transpose(img)
img.thumbnail((150, 150))
img.save(final_place)
thumbnail_width = img.width
thumbnail_height = img.height
return File(file_name=new_filename + file_extension, thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height, thumbnail_path=final_place,
source_url=filename)
def save_post(form, post: Post, type: str):
post.indexable = current_user.indexable
post.sticky = form.sticky.data
post.nsfw = form.nsfw.data
post.nsfl = form.nsfl.data
post.notify_author = form.notify_author.data
post.language_id = form.language_id.data
current_user.language_id = form.language_id.data
if type == '' or type == 'discussion':
post.title = form.discussion_title.data
post.body = form.discussion_body.data
post.body_html = markdown_to_html(post.body)
post.type = POST_TYPE_ARTICLE
elif type == 'link':
post.title = form.link_title.data
post.body = form.link_body.data
post.body_html = markdown_to_html(post.body)
url_changed = post.id is None or form.link_url.data != post.url
post.url = remove_tracking_from_link(form.link_url.data.strip())
post.type = POST_TYPE_LINK
domain = domain_from_url(form.link_url.data)
domain.post_count += 1
post.domain = domain
if url_changed:
if post.image_id:
remove_old_file(post.image_id)
post.image_id = None
if post.url.endswith('.mp4') or post.url.endswith('.webm'):
file = File(source_url=form.link_url.data) # make_image_sizes() will take care of turning this into a still image
post.image = file
db.session.add(file)
else:
unused, file_extension = os.path.splitext(form.link_url.data)
# this url is a link to an image - turn it into a image post
if file_extension.lower() in allowed_extensions:
file = File(source_url=form.link_url.data)
post.image = file
db.session.add(file)
post.type = POST_TYPE_IMAGE
else:
# check opengraph tags on the page and make a thumbnail if an image is available in the og:image meta tag
opengraph = opengraph_parse(form.link_url.data)
if opengraph and (opengraph.get('og:image', '') != '' or opengraph.get('og:image:url', '') != ''):
filename = opengraph.get('og:image') or opengraph.get('og:image:url')
filename_for_extension = filename.split('?')[0] if '?' in filename else filename
unused, file_extension = os.path.splitext(filename_for_extension)
if file_extension.lower() in allowed_extensions and not filename.startswith('/'):
file = url_to_thumbnail_file(filename)
if file:
file.alt_text = shorten_string(opengraph.get('og:title'), 295)
post.image = file
db.session.add(file)
elif type == 'image':
post.title = form.image_title.data
post.body = form.image_body.data
post.body_html = markdown_to_html(post.body)
post.type = POST_TYPE_IMAGE
alt_text = form.image_alt_text.data if form.image_alt_text.data else form.image_title.data
uploaded_file = request.files['image_file']
if uploaded_file and uploaded_file.filename != '':
if post.image_id:
remove_old_file(post.image_id)
post.image_id = None
# check if this is an allowed type of file
file_ext = os.path.splitext(uploaded_file.filename)[1]
if file_ext.lower() not in allowed_extensions:
abort(400)
new_filename = gibberish(15)
# set up the storage directory
directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4]
ensure_directory_exists(directory)
# save the file
final_place = os.path.join(directory, new_filename + file_ext)
final_place_medium = os.path.join(directory, new_filename + '_medium.webp')
final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp')
uploaded_file.seek(0)
uploaded_file.save(final_place)
if file_ext.lower() == '.heic':
register_heif_opener()
Image.MAX_IMAGE_PIXELS = 89478485
# resize if necessary
img = Image.open(final_place)
if '.' + img.format.lower() in allowed_extensions:
img = ImageOps.exif_transpose(img)
img_width = img.width
img_height = img.height
img.thumbnail((2000, 2000))
img.save(final_place)
if img.width > 512 or img.height > 512:
img.thumbnail((512, 512))
img.save(final_place_medium, format="WebP", quality=93)
img_width = img.width
img_height = img.height
# save a second, smaller, version as a thumbnail
img.thumbnail((150, 150))
img.save(final_place_thumbnail, format="WebP", quality=93)
thumbnail_width = img.width
thumbnail_height = img.height
file = File(file_path=final_place_medium, file_name=new_filename + file_ext, alt_text=alt_text,
width=img_width, height=img_height, thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail,
source_url=final_place.replace('app/static/', f"https://{current_app.config['SERVER_NAME']}/static/"))
post.image = file
db.session.add(file)
elif type == 'video':
form.video_url.data = form.video_url.data.strip()
post.title = form.video_title.data
post.body = form.video_body.data
post.body_html = markdown_to_html(post.body)
url_changed = post.id is None or form.video_url.data != post.url
post.url = remove_tracking_from_link(form.video_url.data.strip())
post.type = POST_TYPE_VIDEO
domain = domain_from_url(form.video_url.data)
domain.post_count += 1
post.domain = domain
if url_changed:
if post.image_id:
remove_old_file(post.image_id)
post.image_id = None
if form.video_url.data.endswith('.mp4') or form.video_url.data.endswith('.webm'):
file = File(source_url=form.video_url.data) # make_image_sizes() will take care of turning this into a still image
post.image = file
db.session.add(file)
else:
# check opengraph tags on the page and make a thumbnail if an image is available in the og:image meta tag
opengraph = opengraph_parse(form.video_url.data)
if opengraph and (opengraph.get('og:image', '') != '' or opengraph.get('og:image:url', '') != ''):
filename = opengraph.get('og:image') or opengraph.get('og:image:url')
filename_for_extension = filename.split('?')[0] if '?' in filename else filename
unused, file_extension = os.path.splitext(filename_for_extension)
if file_extension.lower() in allowed_extensions and not filename.startswith('/'):
file = url_to_thumbnail_file(filename)
if file:
file.alt_text = shorten_string(opengraph.get('og:title'), 295)
post.image = file
db.session.add(file)
elif type == 'poll':
...
else:
raise Exception('invalid post type')
if post.id is None:
if current_user.reputation > 100:
post.up_votes = 1
post.score = 1
if current_user.reputation < -100:
post.score = -1
post.ranking = post_ranking(post.score, utcnow())
# Filter by phrase
blocked_phrases_list = blocked_phrases()
for blocked_phrase in blocked_phrases_list:
if blocked_phrase in post.title:
abort(401)
return
if post.body:
for blocked_phrase in blocked_phrases_list:
if blocked_phrase in post.body:
abort(401)
return
db.session.add(post)
else:
db.session.execute(text('DELETE FROM "post_tag" WHERE post_id = :post_id'), { 'post_id': post.id})
post.tags = tags_from_string(form.tags.data)
db.session.commit()
# Notify author about replies
# Remove any subscription that currently exists
existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == post.id,
NotificationSubscription.user_id == current_user.id,
NotificationSubscription.type == NOTIF_POST).first()
if existing_notification:
db.session.delete(existing_notification)
# Add subscription if necessary
if form.notify_author.data:
new_notification = NotificationSubscription(name=post.title, user_id=current_user.id, entity_id=post.id,
type=NOTIF_POST)
db.session.add(new_notification)
g.site.last_active = utcnow()
db.session.commit()
def tags_from_string(tags: str) -> List[Tag]:
return_value = []
tags = tags.strip()
if tags == '':
return []
tag_list = tags.split(',')
tag_list = [tag.strip() for tag in tag_list]
for tag in tag_list:
if tag[0] == '#':
tag = tag[1:]
tag_to_append = find_hashtag_or_create(tag)
if tag_to_append:
return_value.append(tag_to_append)
return return_value
def delete_post_from_community(post_id):
if current_app.debug:
delete_post_from_community_task(post_id)
else:
delete_post_from_community_task.delay(post_id)
@celery.task
def delete_post_from_community_task(post_id):
post = Post.query.get(post_id)
community = post.community
post.delete_dependencies()
db.session.delete(post)
db.session.commit()
if not community.local_only:
delete_json = {
'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}",
'type': 'Delete',
'actor': current_user.profile_id(),
'audience': post.community.profile_id(),
'to': [post.community.profile_id(), 'https://www.w3.org/ns/activitystreams#Public'],
'published': ap_datetime(utcnow()),
'cc': [
current_user.followers_url()
],
'object': post.ap_id,
}
if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it
success = post_request(post.community.ap_inbox_url, delete_json, current_user.private_key,
current_user.ap_profile_id + '#main-key')
else: # local community - send it to followers on remote instances
announce = {
"id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}",
"type": 'Announce',
"to": [
"https://www.w3.org/ns/activitystreams#Public"
],
"actor": post.community.ap_profile_id,
"cc": [
post.community.ap_followers_url
],
'@context': default_context(),
'object': delete_json
}
for instance in post.community.following_instances():
if instance.inbox and not current_user.has_blocked_instance(instance.id) and not instance_banned(
instance.domain):
send_to_remote_instance(instance.id, post.community.id, announce)
def delete_post_reply_from_community(post_reply_id):
if current_app.debug:
delete_post_reply_from_community_task(post_reply_id)
else:
delete_post_reply_from_community_task.delay(post_reply_id)
@celery.task
def delete_post_reply_from_community_task(post_reply_id):
post_reply = PostReply.query.get(post_reply_id)
post = post_reply.post
community = post.community
if post_reply.user_id == current_user.id or community.is_moderator():
if post_reply.has_replies():
post_reply.body = 'Deleted by author' if post_reply.author.id == current_user.id else 'Deleted by moderator'
post_reply.body_html = markdown_to_html(post_reply.body)
else:
post_reply.delete_dependencies()
db.session.delete(post_reply)
db.session.commit()
# federate delete
if not post.community.local_only:
delete_json = {
'id': f"https://{current_app.config['SERVER_NAME']}/activities/delete/{gibberish(15)}",
'type': 'Delete',
'actor': current_user.profile_id(),
'audience': post.community.profile_id(),
'to': [post.community.profile_id(), 'https://www.w3.org/ns/activitystreams#Public'],
'published': ap_datetime(utcnow()),
'cc': [
current_user.followers_url()
],
'object': post_reply.ap_id,
}
if not post.community.is_local(): # this is a remote community, send it to the instance that hosts it
success = post_request(post.community.ap_inbox_url, delete_json, current_user.private_key,
current_user.ap_profile_id + '#main-key')
else: # local community - send it to followers on remote instances
announce = {
"id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{gibberish(15)}",
"type": 'Announce',
"to": [
"https://www.w3.org/ns/activitystreams#Public"
],
"actor": post.community.ap_profile_id,
"cc": [
post.community.ap_followers_url
],
'@context': default_context(),
'object': delete_json
}
for instance in post.community.following_instances():
if instance.inbox and not current_user.has_blocked_instance(instance.id) and not instance_banned(
instance.domain):
send_to_remote_instance(instance.id, post.community.id, announce)
def remove_old_file(file_id):
remove_file = File.query.get(file_id)
remove_file.delete_from_disk()
def save_icon_file(icon_file, directory='communities') -> File:
# check if this is an allowed type of file
file_ext = os.path.splitext(icon_file.filename)[1]
if file_ext.lower() not in allowed_extensions:
abort(400)
new_filename = gibberish(15)
# set up the storage directory
directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4]
ensure_directory_exists(directory)
# save the file
final_place = os.path.join(directory, new_filename + file_ext)
final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp')
icon_file.save(final_place)
if file_ext.lower() == '.heic':
register_heif_opener()
# resize if necessary
Image.MAX_IMAGE_PIXELS = 89478485
img = Image.open(final_place)
if '.' + img.format.lower() in allowed_extensions:
img = ImageOps.exif_transpose(img)
img_width = img.width
img_height = img.height
if img.width > 250 or img.height > 250:
img.thumbnail((250, 250))
img.save(final_place)
img_width = img.width
img_height = img.height
# save a second, smaller, version as a thumbnail
img.thumbnail((40, 40))
img.save(final_place_thumbnail, format="WebP", quality=93)
thumbnail_width = img.width
thumbnail_height = img.height
file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} icon',
width=img_width, height=img_height, thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height, thumbnail_path=final_place_thumbnail)
db.session.add(file)
return file
else:
abort(400)
def save_banner_file(banner_file, directory='communities') -> File:
# check if this is an allowed type of file
file_ext = os.path.splitext(banner_file.filename)[1]
if file_ext.lower() not in allowed_extensions:
abort(400)
new_filename = gibberish(15)
# set up the storage directory
directory = f'app/static/media/{directory}/' + new_filename[0:2] + '/' + new_filename[2:4]
ensure_directory_exists(directory)
# save the file
final_place = os.path.join(directory, new_filename + file_ext)
final_place_thumbnail = os.path.join(directory, new_filename + '_thumbnail.webp')
banner_file.save(final_place)
if file_ext.lower() == '.heic':
register_heif_opener()
# resize if necessary
Image.MAX_IMAGE_PIXELS = 89478485
img = Image.open(final_place)
if '.' + img.format.lower() in allowed_extensions:
img = ImageOps.exif_transpose(img)
img_width = img.width
img_height = img.height
if img.width > 1600 or img.height > 600:
img.thumbnail((1600, 600))
img.save(final_place)
img_width = img.width
img_height = img.height
# save a second, smaller, version as a thumbnail
img.thumbnail((878, 500))
img.save(final_place_thumbnail, format="WebP", quality=93)
thumbnail_width = img.width
thumbnail_height = img.height
file = File(file_path=final_place, file_name=new_filename + file_ext, alt_text=f'{directory} banner',
width=img_width, height=img_height, thumbnail_path=final_place_thumbnail,
thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height)
db.session.add(file)
return file
else:
abort(400)
# NB this always signs POSTs as the community so is only suitable for Announce activities
def send_to_remote_instance(instance_id: int, community_id: int, payload):
if current_app.debug:
send_to_remote_instance_task(instance_id, community_id, payload)
else:
send_to_remote_instance_task.delay(instance_id, community_id, payload)
@celery.task
def send_to_remote_instance_task(instance_id: int, community_id: int, payload):
community = Community.query.get(community_id)
if community:
instance = Instance.query.get(instance_id)
if post_request(instance.inbox, payload, community.private_key, community.ap_profile_id + '#main-key'):
instance.last_successful_send = utcnow()
instance.failures = 0
else:
instance.failures += 1
instance.most_recent_attempt = utcnow()
instance.start_trying_again = utcnow() + timedelta(seconds=instance.failures ** 4)
if instance.failures > 10:
instance.dormant = True
db.session.commit()
def community_in_list(community_id, community_list):
for tup in community_list:
if community_id == tup[0]:
return True
return False