pyfedi/app/shared/tasks/flags.py

65 lines
1.6 KiB
Python
Raw Normal View History

2024-11-02 23:56:56 +00:00
from app import celery
from app.activitypub.signature import default_context, post_request
2025-01-18 17:56:09 +00:00
from app.models import CommunityBan, Post, PostReply, User
2024-11-02 23:56:56 +00:00
from app.utils import gibberish, instance_banned
from flask import current_app
""" JSON format
Flag:
{
'id':
'type':
'actor':
'object':
'@context':
'audience':
'to': []
'summary':
}
"""
@celery.task
def report_reply(send_async, user_id, reply_id, summary):
reply = PostReply.query.filter_by(id=reply_id).one()
report_object(user_id, reply, summary)
2025-01-18 17:56:09 +00:00
@celery.task
def report_post(send_async, user_id, post_id, summary):
post = Post.query.filter_by(id=post_id).one()
report_object(user_id, post, summary)
2024-11-02 23:56:56 +00:00
def report_object(user_id, object, summary):
user = User.query.filter_by(id=user_id).one()
community = object.community
if community.local_only or not community.instance.online():
return
banned = CommunityBan.query.filter_by(user_id=user_id, community_id=community.id).first()
if banned:
return
if not community.is_local():
if user.has_blocked_instance(community.instance.id) or instance_banned(community.instance.domain):
return
flag_id = f"https://{current_app.config['SERVER_NAME']}/activities/flag/{gibberish(15)}"
to = [community.public_url()]
flag = {
'id': flag_id,
'type': 'Flag',
'actor': user.public_url(),
'object': object.public_url(),
'@context': default_context(),
'audience': community.public_url(),
'to': to,
'summary': summary
}
post_request(community.ap_inbox_url, flag, user.private_key, user.public_url() + '#main-key')