adding bans import side

This commit is contained in:
Alan Roberts 2024-09-26 14:46:04 -04:00
parent d3198016db
commit d046df22e3
4 changed files with 153 additions and 14 deletions

View file

@ -58,6 +58,7 @@ class PreLoadCommunitiesForm(FlaskForm):
class ImportExportBannedListsForm(FlaskForm):
import_file = FileField(_l('Import Ban Lists'))
import_submit = SubmitField(_l('Import'))
export_submit = SubmitField(_l('Export'))

View file

@ -32,7 +32,7 @@ from app.models import AllowedInstances, BannedInstances, ActivityPubLog, utcnow
Tag
from app.utils import render_template, permission_required, set_setting, get_setting, gibberish, markdown_to_html, \
moderating_communities, joined_communities, finalize_user_setup, theme_list, blocked_phrases, blocked_referrers, \
topic_tree, languages_for_form, menu_topics, ensure_directory_exists, add_to_modlog, get_request
topic_tree, languages_for_form, menu_topics, ensure_directory_exists, add_to_modlog, get_request, file_get_contents
from app.admin import bp
@ -318,7 +318,23 @@ def admin_federation():
# this is the import bans button
elif ban_lists_form.import_submit.data and ban_lists_form.validate():
flash(_('import button clicked'))
import_file = request.files['import_file']
if import_file and import_file.filename != '':
file_ext = os.path.splitext(import_file.filename)[1]
if file_ext.lower() != '.json':
abort(400)
new_filename = gibberish(15) + '.json'
directory = f'app/static/media/'
# save the file
final_place = os.path.join(directory, new_filename + file_ext)
import_file.save(final_place)
# import bans in background task
import_bans(final_place)
flash(_(f'import file: {import_file}'))
return redirect(url_for('admin.admin_federation'))
# this is the export bans button
@ -329,32 +345,36 @@ def admin_federation():
# get banned_instances info
banned_instances = []
instance_bans = BannedInstances.query.all()
for bi in instance_bans:
banned_instances.append(bi.domain)
if len(instance_bans) > 0:
for bi in instance_bans:
banned_instances.append(bi.domain)
ban_lists_dict['banned_instances'] = banned_instances
# get banned_domains info
banned_domains = []
domain_bans = Domain.query.filter_by(banned=True).all()
for db in domain_bans:
banned_domains.append(db.name)
if len(domain_bans) > 0:
for db in domain_bans:
banned_domains.append(db.name)
ban_lists_dict['banned_domains'] = banned_domains
# get banned_tags info
banned_tags = []
tag_bans = Tag.query.filter_by(banned=True).all()
for tb in tag_bans:
tag_dict = {}
tag_dict['name'] = tb.name
tag_dict['display_as'] = tb.display_as
banned_tags.append(tag_dict)
if len(tag_bans) > 0:
for tb in tag_bans:
tag_dict = {}
tag_dict['name'] = tb.name
tag_dict['display_as'] = tb.display_as
banned_tags.append(tag_dict)
ban_lists_dict['banned_tags'] = banned_tags
# get banned_users info
banned_users = []
user_bans = User.query.filter_by(banned=True).all()
for ub in user_bans:
banned_users.append(ub.ap_id)
if len(user_bans) > 0:
for ub in user_bans:
banned_users.append(ub.ap_id)
ban_lists_dict['banned_users'] = banned_users
# setup the BytesIO buffer
@ -415,6 +435,121 @@ def admin_federation():
)
def import_bans(filename):
if current_app.debug:
import_bans_task(filename)
else:
import_bans_task.delay(filename)
@celery.task
def import_bans_task(filename):
contents = file_get_contents(filename)
contents_json = json.loads(contents)
# import allowed_instances
if get_setting('use_allowlist'):
# check for allowed_instances existing and being more than 0 entries
instances_allowed = contents_json['allowed_instances']
if instance_allowed.isinstance(list) and len(instance_allowed) > 0:
# get the existing allows and their domains
already_allowed_instances = []
already_allowed = AllowedInstances.query.all()
if len(already_allowed) > 0:
for aa in already_allowed:
already_allowed_instances.append(aa.domain)
# loop through the instances_allowed
for ia in instances_allowed:
# check if we have already allowed this instance
if ia in already_allowed_instances:
continue
else:
# allow the instance
db.session.add(AllowedInstances(domain=ia))
# import banned_instances
else:
# check for banned_instances existing and being more than 0 entries
instance_bans = contents_json['banned_instances']
if instance_bans.isinstance(list) and len(instance_bans) > 0:
# get the existing bans and their domains
already_banned_instances = []
already_banned = BannedInstances.query.all()
if len(already_banned) > 0:
for ab in already_banned:
already_banned_instances.append(ab.domain)
# loop through the instance_bans
for ib in instance_bans:
# check if we have already banned this instance
if ib in already_banned_instances:
continue
else:
# ban the domain
db.session.add(BannedInstances(domain=ib))
# import banned_domains
# check for banned_domains existing and being more than 0 entries
domain_bans = contents_json['banned_domains']
if domain_bans.isinstance(list) and len(domain_bans) > 0:
# get the existing bans and their domains
already_banned_domains = []
already_banned = Domain.query.filter_by(banned=True).all()
if len(already_banned) > 0:
for ab in already_banned:
already_banned_domains.append(ab.name)
# loop through the domain_bans
for domb in domain_bans:
# check if we have already banned this domain
if domb in already_banned_domains:
continue
else:
# ban the domain
db.session.add(Domain(name=domb, banned=True))
# import banned_tags
# check for banned_tags existing and being more than 0 entries
tag_bans = contents_json['banned_tags']
if tag_bans.isinstance(list) and len(tag_bans) > 0:
# get the existing bans and their domains
already_banned_tags = []
already_banned = Tag.query.filter_by(banned=True).all()
if len(already_banned) > 0:
for ab in already_banned:
already_banned_tags.append(ab.name)
# loop through the tag_bans
for tb in tag_bans:
# check if we have already banned this tag
if tb['name'] in already_banned_domains:
continue
else:
# ban the domain
db.session.add(Tag(name=tb['name'], display_as=tb['display_as'], banned=True))
# import banned_users
# check for banned_users existing and being more than 0 entries
user_bans = contents_json['banned_users']
if user_bans.isinstance(list) and len(user_bans) > 0:
# get the existing bans and their domains
already_banned_users = []
already_banned = User.query.filter_by(banned=True).all()
if len(already_banned) > 0:
for ab in already_banned:
already_banned_users.append(ab.ap_id)
# loop through the user_bans
for ub in user_bans:
# check if we have already banned this user
if ub in already_banned_domains:
continue
else:
# ban the user
# db.session.add(Tag(name=tb['name'], display_as=tb['display_as'], banned=True))
pass
@bp.route('/activities', methods=['GET'])
@login_required
@permission_required('change instance settings')

View file

@ -174,7 +174,7 @@ def tag_ban(tag):
if tag:
tag.banned = True
db.session.commit()
tag.purge_content()
# tag.purge_content()
flash(_('%(name)s banned for all users and all content deleted.', name=tag.name))
return redirect(url_for('tag.tags'))

View file

@ -22,11 +22,14 @@
<pre><code>
{
"banned_instances": ["banned1.social", "banned2.social"],
"allowed_instances": ["allowed1.social", "allowed2.social"],
"banned_domains": ["banned3.social"],
"banned_tags": [{"name":"badtag","display_as":"BaDtAg"},{...}],
"banned_users": ["baduser@banned4.social"]
}</code>
</pre>
<p>Note: only one of "banned_instances" or "allowed_instances" will be populated.</p>
<!-- {{ render_field(ban_lists_form.import_submit) }} -->
<!-- {{ render_field(ban_lists_form.export_submit) }} -->
{{ render_form(ban_lists_form) }}