mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-02-02 16:21:32 -08:00
use separate DB session for common celery tasks to reduce conflicts
This commit is contained in:
parent
e60eb5b761
commit
53e90bb8c1
4 changed files with 47 additions and 31 deletions
|
@ -32,7 +32,7 @@ def get_locale():
|
|||
return 'en'
|
||||
|
||||
|
||||
db = SQLAlchemy() # engine_options={'pool_size': 5, 'max_overflow': 10} # session_options={"autoflush": False}
|
||||
db = SQLAlchemy(session_options={"autoflush": False}) # engine_options={'pool_size': 5, 'max_overflow': 10} # session_options={"autoflush": False}
|
||||
migrate = Migrate()
|
||||
login = LoginManager()
|
||||
login.login_view = 'auth.login'
|
||||
|
|
|
@ -30,7 +30,7 @@ from app.utils import get_request, allowlist_html, get_setting, ap_datetime, mar
|
|||
microblog_content_to_title, generate_image_from_video_url, is_video_url, \
|
||||
notification_subscribers, communities_banned_from, actor_contains_blocked_words, \
|
||||
html_to_text, add_to_modlog_activitypub, joined_communities, \
|
||||
moderating_communities
|
||||
moderating_communities, get_task_session
|
||||
|
||||
from sqlalchemy import or_
|
||||
|
||||
|
@ -468,7 +468,8 @@ def refresh_user_profile(user_id):
|
|||
|
||||
@celery.task
|
||||
def refresh_user_profile_task(user_id):
|
||||
user = User.query.get(user_id)
|
||||
session = get_task_session()
|
||||
user: User = session.query(User).get(user_id)
|
||||
if user and user.instance_id and user.instance.online():
|
||||
try:
|
||||
actor_data = get_request(user.ap_public_url, headers={'Accept': 'application/activity+json'})
|
||||
|
@ -480,7 +481,7 @@ def refresh_user_profile_task(user_id):
|
|||
return
|
||||
except:
|
||||
try:
|
||||
site = Site.query.get(1)
|
||||
site = session.query(Site).get(1)
|
||||
actor_data = signed_get_request(user.ap_public_url, site.private_key,
|
||||
f"https://{current_app.config['SERVER_NAME']}/actor#main-key")
|
||||
except:
|
||||
|
@ -492,9 +493,9 @@ def refresh_user_profile_task(user_id):
|
|||
# update indexible state on their posts, if necessary
|
||||
new_indexable = activity_json['indexable'] if 'indexable' in activity_json else True
|
||||
if new_indexable != user.indexable:
|
||||
db.session.execute(text('UPDATE "post" set indexable = :indexable WHERE user_id = :user_id'),
|
||||
{'user_id': user.id,
|
||||
'indexable': new_indexable})
|
||||
session.execute(text('UPDATE "post" set indexable = :indexable WHERE user_id = :user_id'),
|
||||
{'user_id': user.id,
|
||||
'indexable': new_indexable})
|
||||
|
||||
user.user_name = activity_json['preferredUsername'].strip()
|
||||
if 'name' in activity_json:
|
||||
|
@ -531,7 +532,7 @@ def refresh_user_profile_task(user_id):
|
|||
if not user.avatar_id or (user.avatar_id and icon_entry != user.avatar.source_url):
|
||||
avatar = File(source_url=icon_entry)
|
||||
user.avatar = avatar
|
||||
db.session.add(avatar)
|
||||
session.add(avatar)
|
||||
avatar_changed = True
|
||||
if 'image' in activity_json:
|
||||
if user.cover_id and activity_json['image']['url'] != user.cover.source_url:
|
||||
|
@ -539,10 +540,11 @@ def refresh_user_profile_task(user_id):
|
|||
if not user.cover_id or (user.cover_id and activity_json['image']['url'] != user.cover.source_url):
|
||||
cover = File(source_url=activity_json['image']['url'])
|
||||
user.cover = cover
|
||||
db.session.add(cover)
|
||||
session.add(cover)
|
||||
cover_changed = True
|
||||
user.recalculate_post_stats()
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
session.close()
|
||||
if user.avatar_id and avatar_changed:
|
||||
make_image_sizes(user.avatar_id, 40, 250, 'users')
|
||||
cache.delete_memoized(User.avatar_image, user)
|
||||
|
@ -561,7 +563,8 @@ def refresh_community_profile(community_id):
|
|||
|
||||
@celery.task
|
||||
def refresh_community_profile_task(community_id):
|
||||
community = Community.query.get(community_id)
|
||||
session = get_task_session()
|
||||
community: Community = session.query(Community).get(community_id)
|
||||
if community and community.instance.online() and not community.is_local():
|
||||
try:
|
||||
actor_data = get_request(community.ap_public_url, headers={'Accept': 'application/activity+json'})
|
||||
|
@ -629,7 +632,7 @@ def refresh_community_profile_task(community_id):
|
|||
if not community.icon_id or (community.icon_id and icon_entry != community.icon.source_url):
|
||||
icon = File(source_url=icon_entry)
|
||||
community.icon = icon
|
||||
db.session.add(icon)
|
||||
session.add(icon)
|
||||
icon_changed = True
|
||||
if 'image' in activity_json:
|
||||
if isinstance(activity_json['image'], dict) and 'url' in activity_json['image']:
|
||||
|
@ -644,7 +647,7 @@ def refresh_community_profile_task(community_id):
|
|||
if not community.image_id or (community.image_id and image_entry != community.image.source_url):
|
||||
image = File(source_url=image_entry)
|
||||
community.image = image
|
||||
db.session.add(image)
|
||||
session.add(image)
|
||||
cover_changed = True
|
||||
if 'language' in activity_json and isinstance(activity_json['language'], list) and not community.ignore_remote_language:
|
||||
for ap_language in activity_json['language']:
|
||||
|
@ -654,7 +657,8 @@ def refresh_community_profile_task(community_id):
|
|||
instance = Instance.query.get(community.instance_id)
|
||||
if instance and instance.software == 'peertube':
|
||||
community.restricted_to_mods = True
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
session.close()
|
||||
if community.icon_id and icon_changed:
|
||||
make_image_sizes(community.icon_id, 60, 250, 'communities')
|
||||
if community.image_id and cover_changed:
|
||||
|
@ -988,7 +992,8 @@ def make_image_sizes(file_id, thumbnail_width=50, medium_width=120, directory='p
|
|||
|
||||
@celery.task
|
||||
def make_image_sizes_async(file_id, thumbnail_width, medium_width, directory, toxic_community):
|
||||
file = File.query.get(file_id)
|
||||
session = get_task_session()
|
||||
file: File = session.query(File).get(file_id)
|
||||
if file and file.source_url:
|
||||
# Videos
|
||||
if file.source_url.endswith('.mp4') or file.source_url.endswith('.webm'):
|
||||
|
@ -1028,7 +1033,7 @@ def make_image_sizes_async(file_id, thumbnail_width, medium_width, directory, to
|
|||
file.thumbnail_width = image.width
|
||||
file.thumbnail_height = image.height
|
||||
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
|
||||
# Images
|
||||
else:
|
||||
|
@ -1110,7 +1115,7 @@ def make_image_sizes_async(file_id, thumbnail_width, medium_width, directory, to
|
|||
file.thumbnail_width = image.width
|
||||
file.thumbnail_height = image.height
|
||||
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
|
||||
# Alert regarding fascist meme content
|
||||
if toxic_community and img_width < 2000: # images > 2000px tend to be real photos instead of 4chan screenshots.
|
||||
|
@ -1124,8 +1129,8 @@ def make_image_sizes_async(file_id, thumbnail_width, medium_width, directory, to
|
|||
user_id=1,
|
||||
author_id=post.user_id,
|
||||
url=url_for('activitypub.post_ap', post_id=post.id))
|
||||
db.session.add(notification)
|
||||
db.session.commit()
|
||||
session.add(notification)
|
||||
session.commit()
|
||||
|
||||
|
||||
def find_reply_parent(in_reply_to: str) -> Tuple[int, int, int]:
|
||||
|
@ -1216,7 +1221,8 @@ def new_instance_profile(instance_id: int):
|
|||
|
||||
@celery.task
|
||||
def new_instance_profile_task(instance_id: int):
|
||||
instance = Instance.query.get(instance_id)
|
||||
session = get_task_session()
|
||||
instance: Instance = session.query(Instance).get(instance_id)
|
||||
try:
|
||||
instance_data = get_request(f"https://{instance.domain}", headers={'Accept': 'application/activity+json'})
|
||||
except:
|
||||
|
@ -1233,7 +1239,7 @@ def new_instance_profile_task(instance_id: int):
|
|||
else: # it's pretty much always /inbox so just assume that it is for whatever this instance is running
|
||||
instance.inbox = f"https://{instance.domain}/inbox"
|
||||
instance.updated_at = utcnow()
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
|
||||
# retrieve list of Admins from /api/v3/site, update InstanceRole
|
||||
try:
|
||||
|
@ -1257,20 +1263,20 @@ def new_instance_profile_task(instance_id: int):
|
|||
user = find_actor_or_create(admin['person']['actor_id'])
|
||||
if user and not instance.user_is_admin(user.id):
|
||||
new_instance_role = InstanceRole(instance_id=instance.id, user_id=user.id, role='admin')
|
||||
db.session.add(new_instance_role)
|
||||
db.session.commit()
|
||||
session.add(new_instance_role)
|
||||
session.commit()
|
||||
# remove any InstanceRoles that are no longer part of instance-data['admins']
|
||||
for instance_admin in InstanceRole.query.filter_by(instance_id=instance.id):
|
||||
if instance_admin.user.profile_id() not in admin_profile_ids:
|
||||
db.session.query(InstanceRole).filter(
|
||||
session.query(InstanceRole).filter(
|
||||
InstanceRole.user_id == instance_admin.user.id,
|
||||
InstanceRole.instance_id == instance.id,
|
||||
InstanceRole.role == 'admin').delete()
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
elif instance_data.status_code == 406 or instance_data.status_code == 404: # Mastodon and PeerTube do 406, a.gup.pe does 404
|
||||
instance.inbox = f"https://{instance.domain}/inbox"
|
||||
instance.updated_at = utcnow()
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
|
||||
headers = {'User-Agent': 'PieFed/1.0', 'Accept': 'application/activity+json'}
|
||||
try:
|
||||
|
@ -1291,12 +1297,13 @@ def new_instance_profile_task(instance_id: int):
|
|||
instance.software = node_json['software']['name'].lower()
|
||||
instance.version = node_json['software']['version']
|
||||
instance.nodeinfo_href = links['href']
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
break # most platforms (except Lemmy v0.19.4) that provide 2.1 also provide 2.0 - there's no need to check both
|
||||
except:
|
||||
return
|
||||
except:
|
||||
return
|
||||
session.close()
|
||||
|
||||
|
||||
# alter the effect of upvotes based on their instance. Default to 1.0
|
||||
|
|
|
@ -20,7 +20,7 @@ from app.models import Community, File, BannedInstances, PostReply, Post, utcnow
|
|||
from app.utils import get_request, gibberish, markdown_to_html, domain_from_url, \
|
||||
is_image_url, ensure_directory_exists, shorten_string, \
|
||||
remove_tracking_from_link, ap_datetime, instance_banned, blocked_phrases, url_to_thumbnail_file, opengraph_parse, \
|
||||
piefed_markdown_to_lemmy_markdown
|
||||
piefed_markdown_to_lemmy_markdown, get_task_session
|
||||
from sqlalchemy import func, desc, text
|
||||
import os
|
||||
|
||||
|
@ -734,9 +734,10 @@ def send_to_remote_instance(instance_id: int, community_id: int, payload):
|
|||
|
||||
@celery.task
|
||||
def send_to_remote_instance_task(instance_id: int, community_id: int, payload):
|
||||
community = Community.query.get(community_id)
|
||||
session = get_task_session()
|
||||
community: Community = session.query(Community).get(community_id)
|
||||
if community:
|
||||
instance = Instance.query.get(instance_id)
|
||||
instance: Instance = session.query(Instance).get(instance_id)
|
||||
if instance.inbox and instance.online() and not instance_banned(instance.domain):
|
||||
if post_request(instance.inbox, payload, community.private_key, community.ap_profile_id + '#main-key') is True:
|
||||
instance.last_successful_send = utcnow()
|
||||
|
@ -747,7 +748,8 @@ def send_to_remote_instance_task(instance_id: int, community_id: int, payload):
|
|||
instance.start_trying_again = utcnow() + timedelta(seconds=instance.failures ** 4)
|
||||
if instance.failures > 10:
|
||||
instance.dormant = True
|
||||
db.session.commit()
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
def community_in_list(community_id, community_list):
|
||||
|
|
|
@ -28,6 +28,7 @@ from flask import current_app, json, redirect, url_for, request, make_response,
|
|||
from flask_babel import _
|
||||
from flask_login import current_user, logout_user
|
||||
from sqlalchemy import text, or_
|
||||
from sqlalchemy.orm import Session
|
||||
from wtforms.fields import SelectField, SelectMultipleField
|
||||
from wtforms.widgets import Select, html_params, ListWidget, CheckboxInput
|
||||
from app import db, cache, httpx_client
|
||||
|
@ -1267,3 +1268,9 @@ def authorise_api_user(auth, return_type=None, id_match=None):
|
|||
def community_ids_from_instances(instance_ids) -> List[int]:
|
||||
communities = Community.query.join(Instance, Instance.id == Community.instance_id).filter(Instance.id.in_(instance_ids))
|
||||
return [community.id for community in communities]
|
||||
|
||||
|
||||
# Set up a new SQLAlchemy session specifically for Celery tasks
|
||||
def get_task_session() -> Session:
|
||||
# Use the same engine as the main app, but create an independent session
|
||||
return Session(bind=db.engine)
|
||||
|
|
Loading…
Add table
Reference in a new issue