rework update_post_from_activity #351

This commit is contained in:
freamon 2024-11-27 15:29:22 +00:00
parent e28550427b
commit 01e28a20b5

View file

@ -32,7 +32,7 @@ from app.utils import get_request, allowlist_html, get_setting, ap_datetime, mar
microblog_content_to_title, generate_image_from_video_url, is_video_url, \
notification_subscribers, communities_banned_from, actor_contains_blocked_words, \
html_to_text, add_to_modlog_activitypub, joined_communities, \
moderating_communities, get_task_session
moderating_communities, get_task_session, is_video_hosting_site, opengraph_parse
from sqlalchemy import or_
@ -1753,13 +1753,7 @@ def update_post_reply_from_activity(reply: PostReply, request_json: dict):
def update_post_from_activity(post: Post, request_json: dict):
if 'name' not in request_json['object']: # Microblog posts
name = "[Microblog]"
else:
name = request_json['object']['name']
nsfl_in_title = '[NSFL]' in name.upper() or '(NSFL)' in name.upper()
post.title = name
# redo body without checking if it's changed
if 'content' in request_json['object'] and request_json['object']['content'] is not None:
if 'mediaType' in request_json['object'] and request_json['object']['mediaType'] == 'text/html':
post.body_html = allowlist_html(request_json['object']['content'])
@ -1776,128 +1770,42 @@ def update_post_from_activity(post: Post, request_json: dict):
request_json['object']['content'] = '<p>' + request_json['object']['content'] + '</p>'
post.body_html = allowlist_html(request_json['object']['content'])
post.body = html_to_text(post.body_html)
if name == "[Microblog]":
autogenerated_title = microblog_content_to_title(post.body_html)
if len(autogenerated_title) < 20:
name += ' ' + autogenerated_title
else:
name = autogenerated_title
nsfl_in_title = '[NSFL]' in name.upper() or '(NSFL)' in name.upper()
post.title = name
# Language
if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict):
language = find_language_or_create(request_json['object']['language']['identifier'], request_json['object']['language']['name'])
post.language_id = language.id
# Links
old_url = post.url
old_image_id = post.image_id
post.url = ''
if request_json['object']['type'] == 'Video':
post.type = POST_TYPE_VIDEO
# PeerTube URL isn't going to change, so set to old_url to prevent this function changing type or icon
post.url = old_url
if 'attachment' in request_json['object'] and len(request_json['object']['attachment']) > 0 and \
'type' in request_json['object']['attachment'][0]:
alt_text = None
if request_json['object']['attachment'][0]['type'] == 'Link':
post.url = request_json['object']['attachment'][0]['href'] # Lemmy < 0.19.4
if request_json['object']['attachment'][0]['type'] == 'Document':
post.url = request_json['object']['attachment'][0]['url'] # Mastodon
if 'name' in request_json['object']['attachment'][0]:
alt_text = request_json['object']['attachment'][0]['name']
if request_json['object']['attachment'][0]['type'] == 'Image':
post.url = request_json['object']['attachment'][0]['url'] # PixelFed / PieFed / Lemmy >= 0.19.4
if 'name' in request_json['object']['attachment'][0]:
alt_text = request_json['object']['attachment'][0]['name']
if post.url == '':
post.type = POST_TYPE_ARTICLE
# title
old_title = post.title
if 'name' in request_json['object']:
new_title = request_json['object']['name']
post.microblog = False
else:
post.url = remove_tracking_from_link(post.url)
if (post.url and post.url != old_url) or (post.url == '' and old_url != ''):
if post.image_id:
old_image = File.query.get(post.image_id)
post.image_id = None
old_image.delete_from_disk()
File.query.filter_by(id=old_image_id).delete()
post.image = None
if (post.url and post.url != old_url):
if is_image_url(post.url):
post.type = POST_TYPE_IMAGE
if 'image' in request_json['object'] and 'url' in request_json['object']['image']:
image = File(source_url=request_json['object']['image']['url'])
else:
image = File(source_url=post.url)
if alt_text:
image.alt_text = alt_text
db.session.add(image)
post.image = image
elif is_video_url(post.url):
post.type = POST_TYPE_VIDEO
image = File(source_url=post.url)
db.session.add(image)
post.image = image
autogenerated_title = microblog_content_to_title(post.body_html)
if len(autogenerated_title) < 20:
new_title = '[Microblog] ' + autogenerated_title.strip()
else:
post.type = POST_TYPE_LINK
domain = domain_from_url(post.url)
# notify about links to banned websites.
already_notified = set() # often admins and mods are the same people - avoid notifying them twice
if domain.notify_mods:
for community_member in post.community.moderators():
notify = Notification(title='Suspicious content', url=post.ap_id,
user_id=community_member.user_id,
author_id=1)
db.session.add(notify)
already_notified.add(community_member.user_id)
if domain.notify_admins:
for admin in Site.admins():
if admin.id not in already_notified:
notify = Notification(title='Suspicious content',
url=post.ap_id, user_id=admin.id,
author_id=1)
db.session.add(notify)
if not domain.banned:
domain.post_count += 1
post.domain = domain
else:
post.url = old_url # don't change if url changed from non-banned domain to banned domain
new_title = autogenerated_title.strip()
post.microblog = True
# Fix-up cross posts (Posts which link to the same url as other posts)
if post.cross_posts is not None:
old_cross_posts = Post.query.filter(Post.id.in_(post.cross_posts)).all()
post.cross_posts.clear()
for ocp in old_cross_posts:
if ocp.cross_posts is not None and post.id in ocp.cross_posts:
ocp.cross_posts.remove(post.id)
new_cross_posts = Post.query.filter(Post.id != post.id, Post.url == post.url, Post.deleted == False,
Post.posted_at > utcnow() - timedelta(days=6)).all()
for ncp in new_cross_posts:
if ncp.cross_posts is None:
ncp.cross_posts = [post.id]
else:
ncp.cross_posts.append(post.id)
if post.cross_posts is None:
post.cross_posts = [ncp.id]
else:
post.cross_posts.append(ncp.id)
if post is not None:
if 'image' in request_json['object'] and post.image is None:
image = File(source_url=request_json['object']['image']['url'])
db.session.add(image)
db.session.commit()
post.image_id = image.id
db.session.add(post)
db.session.commit()
if post.image_id and post.image_id != old_image_id:
make_image_sizes(post.image_id, 170, 512, 'posts') # the 512 sized image is for masonry view
if old_title != new_title:
post.title = new_title
if '[NSFL]' in new_title.upper() or '(NSFL)' in new_title.upper():
post.nsfl = True
if '[NSFW]' in new_title.upper() or '(NSFW)' in new_title.upper():
post.nsfw = True
if 'sensitive' in request_json['object']:
post.nsfw = request_json['object']['sensitive']
if nsfl_in_title:
post.nsfl = True
elif 'nsfl' in request_json['object']:
if 'nsfl' in request_json['object']:
post.nsfl = request_json['object']['nsfl']
# Language
old_language_id = post.language_id
new_language = None
if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict):
new_language = find_language_or_create(request_json['object']['language']['identifier'], request_json['object']['language']['name'])
elif 'contentMap' in request_json['object'] and isinstance(request_json['object']['contentMap'], dict):
new_language = find_language(next(iter(request_json['object']['contentMap'])))
if new_language and (new_language.id != old_language_id):
post.language_id = new_language.id
# Tags
if 'tag' in request_json['object'] and isinstance(request_json['object']['tag'], list):
db.session.execute(text('DELETE FROM "post_tag" WHERE post_id = :post_id'), {'post_id': post.id})
for json_tag in request_json['object']['tag']:
@ -1906,9 +1814,129 @@ def update_post_from_activity(post: Post, request_json: dict):
hashtag = find_hashtag_or_create(json_tag['name'])
if hashtag:
post.tags.append(hashtag)
post.comments_enabled = request_json['object']['commentsEnabled'] if 'commentsEnabled' in request_json['object'] else True
post.edited_at = utcnow()
if request_json['object']['type'] == 'Video':
# return now for PeerTube, otherwise rest of this function breaks the post
# consider querying the Likes endpoint (that mostly seems to be what Updates are about)
return
# Links
old_url = post.url
new_url = None
if 'attachment' in request_json['object'] and len(request_json['object']['attachment']) > 0 and \
'type' in request_json['object']['attachment'][0]:
if request_json['object']['attachment'][0]['type'] == 'Link':
new_url = request_json['object']['attachment'][0]['href'] # Lemmy < 0.19.4
if request_json['object']['attachment'][0]['type'] == 'Document':
new_url = request_json['object']['attachment'][0]['url'] # Mastodon
if request_json['object']['attachment'][0]['type'] == 'Image':
new_url = request_json['object']['attachment'][0]['url'] # PixelFed / PieFed / Lemmy >= 0.19.4
if new_url:
new_url = remove_tracking_from_link(new_url)
new_domain = domain_from_url(new_url)
if new_domain.banned:
db.session.commit()
return # reject change to url if new domain is banned
old_db_entry_to_delete = None
if old_url != new_url:
if post.image:
post.image.delete_from_disk()
old_db_entry_to_delete = post.image_id
if new_url:
post.url = new_url
image = None
if is_image_url(new_url):
post.type = POST_TYPE_IMAGE
image = File(source_url=new_url)
if 'name' in request_json['object']['attachment'][0] and request_json['object']['attachment'][0]['name'] is not None:
image.alt_text = request_json['object']['attachment'][0]['name']
elif is_video_url(new_url):
post.type = POST_TYPE_VIDEO
image = File(source_url=new_url)
else:
if 'image' in request_json['object'] and 'url' in request_json['object']['image']:
image = File(source_url=request_json['object']['image']['url'])
else:
# Let's see if we can do better than the source instance did!
tn_url = new_url
if tn_url[:32] == 'https://www.youtube.com/watch?v=':
tn_url = 'https://youtu.be/' + tn_url[32:43] # better chance of thumbnail from youtu.be than youtube.com
opengraph = opengraph_parse(tn_url)
if opengraph and (opengraph.get('og:image', '') != '' or opengraph.get('og:image:url', '') != ''):
filename = opengraph.get('og:image') or opengraph.get('og:image:url')
if not filename.startswith('/'):
image = File(source_url=filename, alt_text=shorten_string(opengraph.get('og:title'), 295))
if is_video_hosting_site(new_url):
post.type = POST_TYPE_VIDEO
else:
post.type = POST_TYPE_LINK
if image:
db.session.add(image)
db.session.commit()
post.image = image
make_image_sizes(image.id, 170, 512, 'posts') # the 512 sized image is for masonry view
else:
old_db_entry_to_delete = None
# url domain
old_domain = domain_from_url(old_url) if old_url else None
if old_domain != new_domain:
# notify about links to banned websites.
already_notified = set() # often admins and mods are the same people - avoid notifying them twice
if new_domain.notify_mods:
for community_member in post.community.moderators():
notify = Notification(title='Suspicious content', url=post.ap_id,
user_id=community_member.user_id,
author_id=1)
db.session.add(notify)
already_notified.add(community_member.user_id)
if new_domain.notify_admins:
for admin in Site.admins():
if admin.id not in already_notified:
notify = Notification(title='Suspicious content',
url=post.ap_id, user_id=admin.id,
author_id=1)
db.session.add(notify)
new_domain.post_count += 1
post.domain = new_domain
# Fix-up cross posts (Posts which link to the same url as other posts)
if post.cross_posts is not None:
old_cross_posts = Post.query.filter(Post.id.in_(post.cross_posts)).all()
post.cross_posts.clear()
for ocp in old_cross_posts:
if ocp.cross_posts is not None and post.id in ocp.cross_posts:
ocp.cross_posts.remove(post.id)
new_cross_posts = Post.query.filter(Post.id != post.id, Post.url == new_url, Post.deleted == False,
Post.posted_at > utcnow() - timedelta(days=6)).all()
for ncp in new_cross_posts:
if ncp.cross_posts is None:
ncp.cross_posts = [post.id]
else:
ncp.cross_posts.append(post.id)
if post.cross_posts is None:
post.cross_posts = [ncp.id]
else:
post.cross_posts.append(ncp.id)
else:
post.type = POST_TYPE_ARTICLE
post.url = ''
if post.cross_posts is not None: # unlikely, but not impossible
old_cross_posts = Post.query.filter(Post.id.in_(post.cross_posts)).all()
post.cross_posts.clear()
for ocp in old_cross_posts:
if ocp.cross_posts is not None and post.id in ocp.cross_posts:
ocp.cross_posts.remove(post.id)
db.session.commit()
if old_db_entry_to_delete:
File.query.filter_by(id=old_db_entry_to_delete).delete()
db.session.commit()
def undo_downvote(activity_log, comment, post, target_ap_id, user):