From ffd78bfd80ecb5ab33ee37084e75ffe50f91293f Mon Sep 17 00:00:00 2001 From: rimu <3310831+rimu@users.noreply.github.com> Date: Thu, 14 Nov 2024 20:16:09 +1300 Subject: [PATCH] add unique constraint to user.ap_profile_id and clean up old data --- app/activitypub/util.py | 18 ++- app/models.py | 2 +- .../26138ecda7c3_unique_user_ap_profile_id.py | 106 ++++++++++++++++++ 3 files changed, 121 insertions(+), 5 deletions(-) create mode 100644 migrations/versions/26138ecda7c3_unique_user_ap_profile_id.py diff --git a/app/activitypub/util.py b/app/activitypub/util.py index b706b59c..ed39fddd 100644 --- a/app/activitypub/util.py +++ b/app/activitypub/util.py @@ -12,6 +12,8 @@ from flask import current_app, request, g, url_for, json from flask_babel import _ from requests import JSONDecodeError from sqlalchemy import text, func, desc +from sqlalchemy.exc import IntegrityError + from app import db, cache, constants, celery from app.models import User, Post, Community, BannedInstances, File, PostReply, AllowedInstances, Instance, utcnow, \ PostVote, PostReplyVote, ActivityPubLog, Notification, Site, CommunityMember, InstanceRole, Report, Conversation, \ @@ -765,8 +767,12 @@ def actor_json_to_model(activity_json, address, server): cover = File(source_url=activity_json['image']['url']) user.cover = cover db.session.add(cover) - db.session.add(user) - db.session.commit() + try: + db.session.add(user) + db.session.commit() + except IntegrityError: + db.session.rollback() + return User.query.filter_by(ap_profile_id=activity_json['id'].lower()).one() if user.avatar_id: make_image_sizes(user.avatar_id, 40, 250, 'users') if user.cover_id: @@ -860,8 +866,12 @@ def actor_json_to_model(activity_json, address, server): if 'language' in activity_json and isinstance(activity_json['language'], list): for ap_language in activity_json['language']: community.languages.append(find_language_or_create(ap_language['identifier'], ap_language['name'])) - db.session.add(community) - db.session.commit() + try: + db.session.add(community) + db.session.commit() + except IntegrityError: + db.session.rollback() + return Community.query.filter_by(ap_profile_id=activity_json['id'].lower()).one() if community.icon_id: make_image_sizes(community.icon_id, 60, 250, 'communities') if community.image_id: diff --git a/app/models.py b/app/models.py index c9293bd7..642f4062 100644 --- a/app/models.py +++ b/app/models.py @@ -697,7 +697,7 @@ class User(UserMixin, db.Model): conversations = db.relationship('Conversation', lazy='dynamic', secondary=conversation_member, backref=db.backref('members', lazy='joined')) ap_id = db.Column(db.String(255), index=True) # e.g. username@server - ap_profile_id = db.Column(db.String(255), index=True) # e.g. https://server/u/username + ap_profile_id = db.Column(db.String(255), index=True, unique=True) # e.g. https://server/u/username ap_public_url = db.Column(db.String(255)) # e.g. https://server/u/UserName ap_fetched_at = db.Column(db.DateTime) ap_followers_url = db.Column(db.String(255)) diff --git a/migrations/versions/26138ecda7c3_unique_user_ap_profile_id.py b/migrations/versions/26138ecda7c3_unique_user_ap_profile_id.py new file mode 100644 index 00000000..5931bddf --- /dev/null +++ b/migrations/versions/26138ecda7c3_unique_user_ap_profile_id.py @@ -0,0 +1,106 @@ +"""unique user ap profile id + +Revision ID: 26138ecda7c3 +Revises: a4debcf5ac6f +Create Date: 2024-11-14 19:28:59.596757 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy import text + + +# revision identifiers, used by Alembic. +revision = '26138ecda7c3' +down_revision = 'a4debcf5ac6f' +branch_labels = None +depends_on = None + + +def upgrade(): + # Find duplicate users by ap_profile_id + dupes_query = text(''' + SELECT id FROM "user" + WHERE ap_profile_id IN ( + SELECT ap_profile_id FROM "user" + GROUP BY ap_profile_id + HAVING COUNT(*) > 1 + ) + ''') + + conn = op.get_bind() + dupes = conn.execute(dupes_query).scalars() + print('Cleaning up duplicate users, this may take a while...') + for d in dupes: + user_query = text('SELECT id, ap_profile_id FROM "user" WHERE id = :id') + user = conn.execute(user_query, {"id": d}).first() + + if not user: + continue + + # Find users with the same ap_profile_id + users_query = text(''' + SELECT id FROM "user" + WHERE ap_profile_id = :ap_profile_id + ORDER BY id + ''') + users = conn.execute(users_query, {"ap_profile_id": user.ap_profile_id}).fetchall() + + first = True + new_id = None + + for u in users: + if first: + first = False + new_id = u.id + continue + + if new_id: + old_id = u.id + + # Update references in various tables + conn.execute(text('UPDATE "post" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "post_vote" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "post_vote" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "post_reply" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "post_reply_vote" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "post_reply_vote" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "notification" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "notification" SET author_id = :new_id WHERE author_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "notification_subscription" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "community_member" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('DELETE FROM "instance_role" WHERE user_id = :old_id'), {"old_id": old_id}) + conn.execute(text('UPDATE "chat_message" SET sender_id = :new_id WHERE sender_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "chat_message" SET recipient_id = :new_id WHERE recipient_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "community" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "domain_block" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "community_block" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "user_follower" SET local_user_id = :new_id WHERE local_user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "user_follower" SET remote_user_id = :new_id WHERE remote_user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "community_ban" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "user_note" SET target_id = :new_id WHERE target_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "user_block" SET blocked_id = :new_id WHERE blocked_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "community_join_request" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "filter" SET user_id = :new_id WHERE user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "report" SET reporter_id = :new_id WHERE reporter_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "report" SET suspect_user_id = :new_id WHERE suspect_user_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + conn.execute(text('UPDATE "user_follow_request" SET follow_id = :new_id WHERE follow_id = :old_id'), {"new_id": new_id, "old_id": old_id}) + + # Delete the duplicate user + conn.execute(text('DELETE FROM "user" WHERE id = :old_id'), {"old_id": old_id}) + + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('user', schema=None) as batch_op: + batch_op.drop_index('ix_user_ap_profile_id') + batch_op.create_index(batch_op.f('ix_user_ap_profile_id'), ['ap_profile_id'], unique=True) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('user', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_user_ap_profile_id')) + batch_op.create_index('ix_user_ap_profile_id', ['ap_profile_id'], unique=False) + + # ### end Alembic commands ###