add unique constraint to user.ap_profile_id and clean up old data

This commit is contained in:
rimu 2024-11-14 20:16:09 +13:00
parent f16b23bcd8
commit ffd78bfd80
3 changed files with 121 additions and 5 deletions

View file

@ -12,6 +12,8 @@ from flask import current_app, request, g, url_for, json
from flask_babel import _ from flask_babel import _
from requests import JSONDecodeError from requests import JSONDecodeError
from sqlalchemy import text, func, desc from sqlalchemy import text, func, desc
from sqlalchemy.exc import IntegrityError
from app import db, cache, constants, celery from app import db, cache, constants, celery
from app.models import User, Post, Community, BannedInstances, File, PostReply, AllowedInstances, Instance, utcnow, \ from app.models import User, Post, Community, BannedInstances, File, PostReply, AllowedInstances, Instance, utcnow, \
PostVote, PostReplyVote, ActivityPubLog, Notification, Site, CommunityMember, InstanceRole, Report, Conversation, \ PostVote, PostReplyVote, ActivityPubLog, Notification, Site, CommunityMember, InstanceRole, Report, Conversation, \
@ -765,8 +767,12 @@ def actor_json_to_model(activity_json, address, server):
cover = File(source_url=activity_json['image']['url']) cover = File(source_url=activity_json['image']['url'])
user.cover = cover user.cover = cover
db.session.add(cover) db.session.add(cover)
db.session.add(user) try:
db.session.commit() db.session.add(user)
db.session.commit()
except IntegrityError:
db.session.rollback()
return User.query.filter_by(ap_profile_id=activity_json['id'].lower()).one()
if user.avatar_id: if user.avatar_id:
make_image_sizes(user.avatar_id, 40, 250, 'users') make_image_sizes(user.avatar_id, 40, 250, 'users')
if user.cover_id: if user.cover_id:
@ -860,8 +866,12 @@ def actor_json_to_model(activity_json, address, server):
if 'language' in activity_json and isinstance(activity_json['language'], list): if 'language' in activity_json and isinstance(activity_json['language'], list):
for ap_language in activity_json['language']: for ap_language in activity_json['language']:
community.languages.append(find_language_or_create(ap_language['identifier'], ap_language['name'])) community.languages.append(find_language_or_create(ap_language['identifier'], ap_language['name']))
db.session.add(community) try:
db.session.commit() db.session.add(community)
db.session.commit()
except IntegrityError:
db.session.rollback()
return Community.query.filter_by(ap_profile_id=activity_json['id'].lower()).one()
if community.icon_id: if community.icon_id:
make_image_sizes(community.icon_id, 60, 250, 'communities') make_image_sizes(community.icon_id, 60, 250, 'communities')
if community.image_id: if community.image_id:

View file

@ -697,7 +697,7 @@ class User(UserMixin, db.Model):
conversations = db.relationship('Conversation', lazy='dynamic', secondary=conversation_member, backref=db.backref('members', lazy='joined')) conversations = db.relationship('Conversation', lazy='dynamic', secondary=conversation_member, backref=db.backref('members', lazy='joined'))
ap_id = db.Column(db.String(255), index=True) # e.g. username@server ap_id = db.Column(db.String(255), index=True) # e.g. username@server
ap_profile_id = db.Column(db.String(255), index=True) # e.g. https://server/u/username ap_profile_id = db.Column(db.String(255), index=True, unique=True) # e.g. https://server/u/username
ap_public_url = db.Column(db.String(255)) # e.g. https://server/u/UserName ap_public_url = db.Column(db.String(255)) # e.g. https://server/u/UserName
ap_fetched_at = db.Column(db.DateTime) ap_fetched_at = db.Column(db.DateTime)
ap_followers_url = db.Column(db.String(255)) ap_followers_url = db.Column(db.String(255))

View file

@ -0,0 +1,106 @@
"""unique user ap profile id
Revision ID: 26138ecda7c3
Revises: a4debcf5ac6f
Create Date: 2024-11-14 19:28:59.596757
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
# revision identifiers, used by Alembic.
revision = '26138ecda7c3'
down_revision = 'a4debcf5ac6f'
branch_labels = None
depends_on = None
def upgrade():
# Find duplicate users by ap_profile_id
dupes_query = text('''
SELECT id FROM "user"
WHERE ap_profile_id IN (
SELECT ap_profile_id FROM "user"
GROUP BY ap_profile_id
HAVING COUNT(*) > 1
)
''')
conn = op.get_bind()
dupes = conn.execute(dupes_query).scalars()
print('Cleaning up duplicate users, this may take a while...')
for d in dupes:
user_query = text('SELECT id, ap_profile_id FROM "user" WHERE id = :id')
user = conn.execute(user_query, {"id": d}).first()
if not user:
continue
# Find users with the same ap_profile_id
users_query = text('''
SELECT id FROM "user"
WHERE ap_profile_id = :ap_profile_id
ORDER BY id
''')
users = conn.execute(users_query, {"ap_profile_id": user.ap_profile_id}).fetchall()
first = True
new_id = None
for u in users:
if first:
first = False
new_id = u.id
continue
if new_id:
old_id = u.id
# Update references in various tables
conn.execute(text('UPDATE "post" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "post_vote" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "post_vote" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "post_reply" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "post_reply_vote" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "post_reply_vote" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "notification" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "notification" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "notification_subscription" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "community_member" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('DELETE FROM "instance_role" WHERE user_id = :old_id'), {"old_id": old_id})
conn.execute(text('UPDATE "chat_message" SET sender_id = :new_id WHERE sender_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "chat_message" SET recipient_id = :new_id WHERE recipient_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "community" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "domain_block" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "community_block" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "user_follower" SET local_user_id = :new_id WHERE local_user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "user_follower" SET remote_user_id = :new_id WHERE remote_user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "community_ban" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "user_note" SET target_id = :new_id WHERE target_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "user_block" SET blocked_id = :new_id WHERE blocked_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "community_join_request" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "filter" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "report" SET reporter_id = :new_id WHERE reporter_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "report" SET suspect_user_id = :new_id WHERE suspect_user_id = :old_id'), {"new_id": new_id, "old_id": old_id})
conn.execute(text('UPDATE "user_follow_request" SET follow_id = :new_id WHERE follow_id = :old_id'), {"new_id": new_id, "old_id": old_id})
# Delete the duplicate user
conn.execute(text('DELETE FROM "user" WHERE id = :old_id'), {"old_id": old_id})
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.drop_index('ix_user_ap_profile_id')
batch_op.create_index(batch_op.f('ix_user_ap_profile_id'), ['ap_profile_id'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_user_ap_profile_id'))
batch_op.create_index('ix_user_ap_profile_id', ['ap_profile_id'], unique=False)
# ### end Alembic commands ###