move long-running tasks to separate background process (celery + redis)

This commit is contained in:
rimu 2023-12-24 13:28:41 +13:00
parent 684e68c3fd
commit 6182240ad3
17 changed files with 595 additions and 525 deletions

1
.gitignore vendored
View file

@ -161,3 +161,4 @@ cython_debug/
.idea/
app/static/*.css.map
/app/static/media/
celery_worker.py

View file

@ -13,6 +13,7 @@ from flask_mail import Mail
from flask_moment import Moment
from flask_babel import Babel, lazy_gettext as _l
from flask_caching import Cache
from celery import Celery
from sqlalchemy_searchable import make_searchable
from config import Config
@ -28,6 +29,7 @@ bootstrap = Bootstrap5()
moment = Moment()
babel = Babel()
cache = Cache()
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
def create_app(config_class=Config):
@ -43,6 +45,7 @@ def create_app(config_class=Config):
make_searchable(db.metadata)
babel.init_app(app, locale_selector=get_locale)
cache.init_app(app)
celery.conf.update(app.config)
from app.main import bp as main_bp
app.register_blueprint(main_bp)

View file

@ -1,4 +1,4 @@
from app import db, constants, cache
from app import db, constants, cache, celery
from app.activitypub import bp
from flask import request, Response, current_app, abort, jsonify, json, g
@ -8,7 +8,7 @@ from app.post.routes import continue_discussion, show_post
from app.user.routes import show_profile
from app.constants import POST_TYPE_LINK, POST_TYPE_IMAGE, SUBSCRIPTION_MEMBER
from app.models import User, Community, CommunityJoinRequest, CommunityMember, CommunityBan, ActivityPubLog, Post, \
PostReply, Instance, PostVote, PostReplyVote, File, AllowedInstances, BannedInstances, utcnow
PostReply, Instance, PostVote, PostReplyVote, File, AllowedInstances, BannedInstances, utcnow, Site
from app.activitypub.util import public_key, users_total, active_half_year, active_month, local_posts, local_comments, \
post_to_activity, find_actor_or_create, default_context, instance_blocked, find_reply_parent, find_liked_object, \
lemmy_site_data, instance_weight, is_activitypub_request, downvote_post_reply, downvote_post, upvote_post_reply, \
@ -98,7 +98,7 @@ def nodeinfo2():
"localPosts": local_posts(),
"localComments": local_comments()
},
"openRegistrations": True
"openRegistrations": g.site.registration_mode == 'Open'
}
return jsonify(nodeinfo_data)
@ -278,10 +278,8 @@ def shared_inbox():
db.session.add(activity_log)
db.session.commit()
return ''
else:
if 'id' in request_json:
activity_log.activity_id = request_json['id']
if 'id' in request_json:
if activity_already_ingested(request_json['id']): # Lemmy has an extremely short POST timeout and tends to retry unnecessarily. Ignore their retries.
activity_log.result = 'ignored'
db.session.add(activity_log)
@ -290,6 +288,7 @@ def shared_inbox():
activity_log.activity_id = request_json['id']
activity_log.activity_json = json.dumps(request_json)
activity_log.result = 'processing'
# Mastodon spams the whole fediverse whenever any of their users are deleted. Ignore them, for now. The Activity includes the Actor signature so it should be possible to verify the POST and do the delete if valid, without a call to find_actor_or_create() and all the network activity that involves. One day.
if 'type' in request_json and request_json['type'] == 'Delete' and request_json['id'].endswith('#delete'):
@ -298,10 +297,40 @@ def shared_inbox():
db.session.add(activity_log)
db.session.commit()
return ''
else:
db.session.add(activity_log)
db.session.commit()
else:
activity_log.activity_id = ''
activity_log.activity_json = json.dumps(request_json)
db.session.add(activity_log)
db.session.commit()
actor = find_actor_or_create(request_json['actor']) if 'actor' in request_json else None
if actor is not None:
if HttpSignature.verify_request(request, actor.public_key, skip_date=True):
if current_app.debug:
process_inbox_request(request_json, activity_log.id)
else:
process_inbox_request.delay(request_json, activity_log.id)
return ''
else:
activity_log.exception_message = 'Could not verify signature'
else:
actor_name = request_json['actor'] if 'actor' in request_json else ''
activity_log.exception_message = f'Actor could not be found: {actor_name}'
if activity_log.exception_message is not None:
activity_log.result = 'failure'
db.session.commit()
return ''
@celery.task
def process_inbox_request(request_json, activitypublog_id):
with current_app.app_context():
activity_log = ActivityPubLog.query.get(activitypublog_id)
site = Site.query.get(1) # can't use g.site because celery doesn't use Flask's g variable
if 'type' in request_json:
activity_log.activity_type = request_json['type']
if not instance_blocked(request_json['id']):
@ -313,32 +342,27 @@ def shared_inbox():
if community_ap_id == 'https://www.w3.org/ns/activitystreams#Public': # kbin does this when posting a reply
if 'to' in request_json['object'] and request_json['object']['to']:
community_ap_id = request_json['object']['to'][0]
if community_ap_id == 'https://www.w3.org/ns/activitystreams#Public' and 'cc' in request_json['object'] and request_json['object']['cc']:
if community_ap_id == 'https://www.w3.org/ns/activitystreams#Public' and 'cc' in \
request_json['object'] and request_json['object']['cc']:
community_ap_id = request_json['object']['cc'][0]
elif 'cc' in request_json['object'] and request_json['object']['cc']:
community_ap_id = request_json['object']['cc'][0]
community = find_actor_or_create(community_ap_id)
user = find_actor_or_create(user_ap_id)
if (user and not user.is_local()) and community:
user.last_seen = community.last_active = g.site.last_active = utcnow()
user.last_seen = community.last_active = site.last_active = utcnow()
object_type = request_json['object']['type']
new_content_types = ['Page', 'Article', 'Link', 'Note']
if object_type in new_content_types: # create a new post
in_reply_to = request_json['object']['inReplyTo'] if 'inReplyTo' in \
request_json[
'object'] else None
in_reply_to = request_json['object']['inReplyTo'] if 'inReplyTo' in request_json['object'] else None
if not in_reply_to:
post = Post(user_id=user.id, community_id=community.id,
title=request_json['object']['name'],
comments_enabled=request_json['object'][
'commentsEnabled'],
sticky=request_json['object']['stickied'] if 'stickied' in
request_json[
'object'] else False,
comments_enabled=request_json['object']['commentsEnabled'],
sticky=request_json['object']['stickied'] if 'stickied' in request_json['object'] else False,
nsfw=request_json['object']['sensitive'],
nsfl=request_json['object']['nsfl'] if 'nsfl' in request_json[
'object'] else False,
nsfl=request_json['object']['nsfl'] if 'nsfl' in request_json['object'] else False,
ap_id=request_json['object']['id'],
ap_create_id=request_json['id'],
ap_announce_id=None,
@ -346,16 +370,13 @@ def shared_inbox():
up_votes=1,
score=instance_weight(user.ap_domain)
)
if 'source' in request_json['object'] and \
request_json['object']['source'][
'mediaType'] == 'text/markdown':
if 'source' in request_json['object'] and request_json['object']['source']['mediaType'] == 'text/markdown':
post.body = request_json['object']['source']['content']
post.body_html = markdown_to_html(post.body)
elif 'content' in request_json['object'] and request_json['object']['content'] is not None:
post.body_html = allowlist_html(request_json['object']['content'])
post.body = html_to_markdown(post.body_html)
if 'attachment' in request_json['object'] and \
len(request_json['object']['attachment']) > 0 and \
if 'attachment' in request_json['object'] and len(request_json['object']['attachment']) > 0 and \
'type' in request_json['object']['attachment'][0]:
if request_json['object']['attachment'][0]['type'] == 'Link':
post.url = request_json['object']['attachment'][0]['href']
@ -369,7 +390,6 @@ def shared_inbox():
else:
post = None
activity_log.exception_message = domain.name + ' is blocked by admin'
activity_log.result = 'failure'
if 'image' in request_json['object']:
image = File(source_url=request_json['object']['image']['url'])
db.session.add(image)
@ -398,13 +418,11 @@ def shared_inbox():
ap_create_id=request_json['id'],
ap_announce_id=None)
if 'source' in request_json['object'] and \
request_json['object']['source'][
'mediaType'] == 'text/markdown':
request_json['object']['source']['mediaType'] == 'text/markdown':
post_reply.body = request_json['object']['source']['content']
post_reply.body_html = markdown_to_html(post_reply.body)
elif 'content' in request_json['object']:
post_reply.body_html = allowlist_html(
request_json['object']['content'])
post_reply.body_html = allowlist_html(request_json['object']['content'])
post_reply.body = html_to_markdown(post_reply.body_html)
if post_reply is not None:
@ -434,7 +452,7 @@ def shared_inbox():
community = find_actor_or_create(community_ap_id)
user = find_actor_or_create(user_ap_id)
if (user and not user.is_local()) and community:
user.last_seen = community.last_active = g.site.last_active = utcnow()
user.last_seen = community.last_active = site.last_active = utcnow()
object_type = request_json['object']['object']['type']
new_content_types = ['Page', 'Article', 'Link', 'Note']
if object_type in new_content_types: # create a new post
@ -446,7 +464,7 @@ def shared_inbox():
title=request_json['object']['object']['name'],
comments_enabled=request_json['object']['object']['commentsEnabled'],
sticky=request_json['object']['object']['stickied'] if 'stickied' in request_json['object']['object'] else False,
nsfw=request_json['object']['object']['sensitive'],
nsfw=request_json['object']['object']['sensitive'] if 'sensitive' in request_json['object']['object'] else False,
nsfl=request_json['object']['object']['nsfl'] if 'nsfl' in request_json['object']['object'] else False,
ap_id=request_json['object']['object']['id'],
ap_create_id=request_json['object']['id'],
@ -475,7 +493,6 @@ def shared_inbox():
else:
post = None
activity_log.exception_message = domain.name + ' is blocked by admin'
activity_log.result = 'failure'
if 'image' in request_json['object']['object']:
image = File(source_url=request_json['object']['object']['image']['url'])
db.session.add(image)
@ -497,8 +514,7 @@ def shared_inbox():
ap_create_id=request_json['object']['id'],
ap_announce_id=request_json['id'])
if 'source' in request_json['object']['object'] and \
request_json['object']['object']['source'][
'mediaType'] == 'text/markdown':
request_json['object']['object']['source']['mediaType'] == 'text/markdown':
post_reply.body = request_json['object']['object']['source']['content']
post_reply.body_html = markdown_to_html(post_reply.body)
elif 'content' in request_json['object']['object']:
@ -546,7 +562,7 @@ def shared_inbox():
# todo: if vote was on content in local community, federate the vote out to followers
elif request_json['object']['type'] == 'Dislike':
activity_log.activity_type = request_json['object']['type']
if g.site.enable_downvotes is False:
if site.enable_downvotes is False:
activity_log.exception_message = 'Dislike ignored because of allow_dislike setting'
else:
user_ap_id = request_json['object']['actor']
@ -578,8 +594,7 @@ def shared_inbox():
community = find_actor_or_create(community_ap_id)
if user is not None and community is not None:
# check if user is banned from this community
banned = CommunityBan.query.filter_by(user_id=user.id,
community_id=community.id).first()
banned = CommunityBan.query.filter_by(user_id=user.id, community_id=community.id).first()
if banned is None:
user.last_seen = utcnow()
if community_membership(user, community) != SUBSCRIPTION_MEMBER:
@ -625,8 +640,7 @@ def shared_inbox():
user = find_actor_or_create(user_ap_id)
community = find_actor_or_create(community_ap_id)
if user and community:
join_request = CommunityJoinRequest.query.filter_by(user_id=user.id,
community_id=community.id).first()
join_request = CommunityJoinRequest.query.filter_by(user_id=user.id, community_id=community.id).first()
if join_request:
member = CommunityMember(user_id=user.id, community_id=community.id)
db.session.add(member)
@ -644,8 +658,7 @@ def shared_inbox():
if user and community:
user.last_seen = utcnow()
member = CommunityMember.query.filter_by(user_id=user.id, community_id=community.id).first()
join_request = CommunityJoinRequest.query.filter_by(user_id=user.id,
community_id=community.id).first()
join_request = CommunityJoinRequest.query.filter_by(user_id=user.id, community_id=community.id).first()
if member:
db.session.delete(member)
if join_request:
@ -720,8 +733,7 @@ def shared_inbox():
if request_json['object']['type'] == 'Page': # Editing a post
post = Post.query.filter_by(ap_id=request_json['object']['id']).first()
if post:
if 'source' in request_json['object'] and \
request_json['object']['source']['mediaType'] == 'text/markdown':
if 'source' in request_json['object'] and request_json['object']['source']['mediaType'] == 'text/markdown':
post.body = request_json['object']['source']['content']
post.body_html = markdown_to_html(post.body)
elif 'content' in request_json['object']:
@ -733,8 +745,7 @@ def shared_inbox():
elif request_json['object']['type'] == 'Note': # Editing a reply
reply = PostReply.query.filter_by(ap_id=request_json['object']['id']).first()
if reply:
if 'source' in request_json['object'] and \
request_json['object']['source']['mediaType'] == 'text/markdown':
if 'source' in request_json['object'] and request_json['object']['source']['mediaType'] == 'text/markdown':
reply.body = request_json['object']['source']['content']
reply.body_html = markdown_to_html(reply.body)
elif 'content' in request_json['object']:
@ -808,16 +819,10 @@ def shared_inbox():
post.flush_cache()
else:
activity_log.exception_message = 'Instance banned'
else:
activity_log.exception_message = 'Could not verify signature'
else:
activity_log.exception_message = 'Actor could not be found: ' + request_json['actor']
if activity_log.exception_message is not None:
activity_log.result = 'failure'
db.session.add(activity_log)
db.session.commit()
return ''
@bp.route('/c/<actor>/outbox', methods=['GET'])

View file

@ -215,7 +215,6 @@ def find_actor_or_create(actor: str) -> Union[User, Community, None]:
if 'rel' in links and links['rel'] == 'self': # this contains the URL of the activitypub profile
type = links['type'] if 'type' in links else 'application/activity+json'
# retrieve the activitypub profile
print('****', links['href'])
actor_data = get_request(links['href'], headers={'Accept': type})
# to see the structure of the json contained in actor_data, do a GET to https://lemmy.world/c/technology with header Accept: application/activity+json
if actor_data.status_code == 200:
@ -462,7 +461,10 @@ def find_instance_id(server):
try:
instance_data = get_request(f"https://{server}", headers={'Accept': 'application/activity+json'})
except:
return None
new_instance = Instance(domain=server, software='unknown', created_at=utcnow())
db.session.add(new_instance)
db.session.commit()
return new_instance.id
if instance_data.status_code == 200:
try:
instance_json = instance_data.json()
@ -487,7 +489,11 @@ def find_instance_id(server):
db.session.add(new_instance)
db.session.commit()
return new_instance.id
return None
else:
new_instance = Instance(domain=server, software='unknown', created_at=utcnow())
db.session.add(new_instance)
db.session.commit()
return new_instance.id
# alter the effect of upvotes based on their instance. Default to 1.0

View file

@ -1,5 +1,5 @@
from flask import current_app, render_template, escape
from app import db
from app import db, celery
from flask_babel import _, lazy_gettext as _l # todo: set the locale based on account_id so that _() works
import boto3
from botocore.exceptions import ClientError
@ -9,6 +9,7 @@ AWS_REGION = "ap-southeast-2"
CHARSET = "UTF-8"
@celery.task
def send_async_email(subject, sender, recipients, text_body, html_body, reply_to):
if type(recipients) == str:
recipients = [recipients]
@ -62,5 +63,7 @@ def send_async_email(subject, sender, recipients, text_body, html_body, reply_to
def send_email(subject, sender, recipients: List[str], text_body, html_body, reply_to=None):
# todo: make async or threaded
if current_app.debug:
send_async_email(subject, sender, recipients, text_body, html_body, reply_to)
else:
send_async_email.delay(subject, sender, recipients, text_body, html_body, reply_to)

View file

@ -107,6 +107,7 @@ def verification_warning():
flash(_('Please click the link in your email inbox to verify your account.'), 'warning')
@cache.cached(timeout=6)
def activitypub_application():
application_data = {
'@context': default_context(),

View file

@ -488,6 +488,16 @@ fieldset legend {
max-width: 100%;
}
.render_username {
display: inline;
}
.render_username a img {
width: 20px;
height: 20px;
border-radius: 50%;
vertical-align: bottom;
}
.comments > .comment {
margin-left: 0;
border-top: solid 1px #bbb;

View file

@ -198,6 +198,16 @@ nav, etc which are used site-wide */
}
}
.render_username {
display: inline;
a img {
width: 20px;
height: 20px;
border-radius: 50%;
vertical-align: bottom;
}
}
.comments > .comment {
margin-left: 0;
border-top: solid 1px $grey;

View file

@ -1,12 +1,18 @@
{% macro render_username(user) %}
<span class="render_username">
{% if user.deleted %}
[deleted]
{% else %}
{% if user.avatar_id %}
<a href="/u/{{ user.link() }}" title="{{ user.ap_id if user.ap_id != none else user.user_name }}">
<img src="{{ user.avatar_image() }}" alt="Avatar" /></a>
{% endif %}
<a href="/u/{{ user.link() }}" title="{{ user.ap_id if user.ap_id != none else user.user_name }}">{{ user.user_name }}</a>
{% if user.created_recently() %}
<span class="fe fe-new-account" title="New account"> </span>
{% endif %}
{% endif %}
</span>
{% endmacro %}
<!doctype html>
<html lang="en">

View file

@ -84,8 +84,8 @@
<h2>{{ _('About community') }}</h2>
</div>
<div class="card-body">
<p>{{ community.description|safe }}</p>
<p>{{ community.rules|safe }}</p>
<p>{{ community.description_html|safe }}</p>
<p>{{ community.rules_html|safe }}</p>
{% if len(mods) > 0 and not community.private_mods %}
<h3>Moderators</h3>
<ul>

View file

@ -134,8 +134,8 @@
<h2>{{ _('About community') }}</h2>
</div>
<div class="card-body">
<p>{{ post.community.description|safe }}</p>
<p>{{ post.community.rules|safe }}</p>
<p>{{ post.community.description_html|safe }}</p>
<p>{{ post.community.rules_html|safe }}</p>
{% if len(mods) > 0 and not post.community.private_mods %}
<h3>Moderators</h3>
<ol>

View file

@ -42,7 +42,6 @@ def render_template(template_name: str, **context) -> Response:
def request_etag_matches(etag):
print(str(request.headers))
if 'If-None-Match' in request.headers:
old_etag = request.headers['If-None-Match']
return old_etag == etag

11
celery_worker.default.py Normal file
View file

@ -0,0 +1,11 @@
#!/usr/bin/env python
import os
from app import celery, create_app
app = create_app()
if not app.debug:
os.environ['DATABASE_URL'] = 'postgresql+psycopg2://pyfedi:pyfedi@127.0.0.1/pyfedi'
os.environ['SERVER_NAME'] = 'piefed.ngrok.app'
app.app_context().push()

View file

@ -27,4 +27,6 @@ class Config(object):
CACHE_DEFAULT_TIMEOUT = 300
CACHE_THRESHOLD = 1000
CACHE_KEY_PREFIX = 'pyfedi'
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or 'redis://localhost:6379/0'
RESULT_BACKEND = os.environ.get('RESULT_BACKEND') or 'redis://localhost:6379/0'
SQLALCHEMY_ECHO = False # set to true to see SQL in console

12
dev_notes.txt Normal file
View file

@ -0,0 +1,12 @@
for celery, run this:
celery -A celery_worker.celery worker --loglevel=INFO
on prod web server, celery is managed by systemd: /etc/default/celeryd and /etc/systemd/system/celeryd.service
sudo systemctl stop celeryd
sudo systemctl restart celeryd or sudo service celeryd restart
*** check for celery-related problems by looking in /var/log/celery ***

View file

@ -144,7 +144,6 @@ environment
https://sh.itjust.works/c/sewingrepairing
https://lemmy.world/c/fuckcars
https://lemmy.world/c/evs
https://feddit.uk/c/evs
https://slrpnk.net/c/solarpunk
https://slrpnk.net/c/climate
https://slrpnk.net/c/energy

View file

@ -26,3 +26,5 @@ Pillow
pillow-heif
opengraph-parse=0.0.6
feedgen==0.9.0
celery==5.3.6
redis==5.0.1