mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-24 03:43:42 -08:00
92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
from app import db, cache
|
|
from app.constants import ROLE_STAFF, ROLE_ADMIN
|
|
from app.models import UserBlock
|
|
from app.utils import authorise_api_user, blocked_users
|
|
|
|
from flask import flash
|
|
from flask_babel import _
|
|
from flask_login import current_user
|
|
|
|
from sqlalchemy import text
|
|
|
|
# would be in app/constants.py
|
|
SRC_WEB = 1
|
|
SRC_PUB = 2
|
|
SRC_API = 3
|
|
|
|
# only called from API for now, but can be called from web using [un]block_another_user(user.id, SRC_WEB)
|
|
|
|
# user_id: the local, logged-in user
|
|
# person_id: the person they want to block
|
|
|
|
def block_another_user(person_id, src, auth=None):
|
|
if src == SRC_API:
|
|
try:
|
|
user_id = authorise_api_user(auth)
|
|
except:
|
|
raise
|
|
else:
|
|
user_id = current_user.id
|
|
|
|
if user_id == person_id:
|
|
if src == SRC_API:
|
|
raise Exception('cannot_block_self')
|
|
else:
|
|
flash(_('You cannot block yourself.'), 'error')
|
|
return
|
|
|
|
role = db.session.execute(text('SELECT role_id FROM "user_role" WHERE user_id = :person_id'), {'person_id': person_id}).scalar()
|
|
if role == ROLE_ADMIN or role == ROLE_STAFF:
|
|
if src == SRC_API:
|
|
raise Exception('cannot_block_admin_or_staff')
|
|
else:
|
|
flash(_('You cannot block admin or staff.'), 'error')
|
|
return
|
|
|
|
existing_block = UserBlock.query.filter_by(blocker_id=user_id, blocked_id=person_id).first()
|
|
if not existing_block:
|
|
block = UserBlock(blocker_id=user_id, blocked_id=person_id)
|
|
db.session.add(block)
|
|
db.session.execute(text('DELETE FROM "notification_subscription" WHERE entity_id = :current_user AND user_id = :user_id'),
|
|
{'current_user': user_id, 'user_id': person_id})
|
|
db.session.commit()
|
|
|
|
cache.delete_memoized(blocked_users, user_id)
|
|
|
|
# Nothing to fed? (Lemmy doesn't federate anything to the blocked person)
|
|
|
|
if src == SRC_API:
|
|
return user_id
|
|
else:
|
|
return # let calling function handle confirmation flash message and redirect
|
|
|
|
|
|
def unblock_another_user(person_id, src, auth=None):
|
|
if src == SRC_API:
|
|
try:
|
|
user_id = authorise_api_user(auth)
|
|
except:
|
|
raise
|
|
else:
|
|
user_id = current_user.id
|
|
|
|
if user_id == person_id:
|
|
if src == SRC_API:
|
|
raise Exception('cannot_unblock_self')
|
|
else:
|
|
flash(_('You cannot unblock yourself.'), 'error')
|
|
return
|
|
|
|
existing_block = UserBlock.query.filter_by(blocker_id=user_id, blocked_id=person_id).first()
|
|
if existing_block:
|
|
db.session.delete(existing_block)
|
|
db.session.commit()
|
|
|
|
cache.delete_memoized(blocked_users, user_id)
|
|
|
|
# Nothing to fed? (Lemmy doesn't federate anything to the unblocked person)
|
|
|
|
if src == SRC_API:
|
|
return user_id
|
|
else:
|
|
return # let calling function handle confirmation flash message and redirect
|