pyfedi/app/cli.py
2024-02-10 12:22:16 +13:00

199 lines
9.4 KiB
Python

# if commands in this file are not working (e.g. 'flask translate') make sure you set the FLASK_APP environment variable.
# e.g. export FLASK_APP=pyfedi.py
from datetime import datetime, timedelta
from flask import json
from app import db
import click
import os
from app.activitypub.signature import RsaKeys
from app.auth.util import random_token
from app.email import send_verification_email
from app.models import Settings, BannedInstances, Interest, Role, User, RolePermission, Domain, ActivityPubLog, \
utcnow, Site, Instance, File
from app.utils import file_get_contents, retrieve_block_list
def register(app):
@app.cli.group()
def translate():
"""Translation and localization commands."""
pass
@translate.command()
@click.argument('lang')
def init(lang):
"""Initialize a new language."""
if os.system('pybabel extract -F babel.cfg -k _l -o messages.pot .'):
raise RuntimeError('extract command failed')
if os.system(
'pybabel init -i messages.pot -d app/translations -l ' + lang):
raise RuntimeError('init command failed')
os.remove('messages.pot')
@translate.command()
def update():
"""Update all languages."""
if os.system('pybabel extract -F babel.cfg -k _l -o messages.pot .'):
raise RuntimeError('extract command failed')
if os.system('pybabel update -i messages.pot -d app/translations'):
raise RuntimeError('update command failed')
os.remove('messages.pot')
@translate.command()
def compile():
"""Compile all languages."""
if os.system('pybabel compile -d app/translations'):
raise RuntimeError('compile command failed')
@app.cli.command("init-db")
def init_db():
with app.app_context():
db.drop_all()
db.configure_mappers()
db.create_all()
private_key, public_key = RsaKeys.generate_keypair()
db.session.add(Site(name="PieFed", description='', public_key=public_key, private_key=private_key))
db.session.add(Instance(domain=app.config['SERVER_NAME'], software='PieFed')) # Instance 1 is always the local instance
db.session.add(Settings(name='allow_nsfw', value=json.dumps(False)))
db.session.add(Settings(name='allow_nsfl', value=json.dumps(False)))
db.session.add(Settings(name='allow_dislike', value=json.dumps(True)))
db.session.add(Settings(name='allow_local_image_posts', value=json.dumps(True)))
db.session.add(Settings(name='allow_remote_image_posts', value=json.dumps(True)))
db.session.add(Settings(name='registration_open', value=json.dumps(True)))
db.session.add(Settings(name='approve_registrations', value=json.dumps(False)))
db.session.add(Settings(name='federation', value=json.dumps(True)))
db.session.add(BannedInstances(domain='lemmygrad.ml'))
db.session.add(BannedInstances(domain='gab.com'))
db.session.add(BannedInstances(domain='rqd2.net'))
db.session.add(BannedInstances(domain='exploding-heads.com'))
db.session.add(BannedInstances(domain='hexbear.net'))
db.session.add(BannedInstances(domain='threads.net'))
db.session.add(BannedInstances(domain='pieville.net'))
db.session.add(BannedInstances(domain='noauthority.social'))
db.session.add(BannedInstances(domain='pieville.net'))
db.session.add(BannedInstances(domain='links.hackliberty.org'))
interests = file_get_contents('interests.txt')
db.session.add(Interest(name='🕊 Chilling', communities=parse_communities(interests, 'chilling')))
db.session.add(Interest(name='💭 Interesting stuff', communities=parse_communities(interests, 'interesting stuff')))
db.session.add(Interest(name='📰 News & Politics', communities=parse_communities(interests, 'news & politics')))
db.session.add(Interest(name='🎮 Gaming', communities=parse_communities(interests, 'gaming')))
db.session.add(Interest(name='🤓 Linux', communities=parse_communities(interests, 'linux')))
db.session.add(Interest(name='♻️ Environment', communities=parse_communities(interests, 'environment')))
db.session.add(Interest(name='🏳‍🌈 LGBTQ+', communities=parse_communities(interests, 'lgbtq')))
db.session.add(Interest(name='🛠 Programming', communities=parse_communities(interests, 'programming')))
db.session.add(Interest(name='🖥️ Tech', communities=parse_communities(interests, 'tech')))
db.session.add(Interest(name='🤗 Mental Health', communities=parse_communities(interests, 'mental health')))
db.session.add(Interest(name='💊 Health', communities=parse_communities(interests, 'health')))
# Load initial domain block list
block_list = retrieve_block_list()
if block_list:
for domain in block_list.split('\n'):
db.session.add(Domain(name=domain.strip(), banned=True))
# Initial roles
anon_role = Role(name='Anonymous user', weight=0)
anon_role.permissions.append(RolePermission(permission='register'))
db.session.add(anon_role)
auth_role = Role(name='Authenticated user', weight=1)
db.session.add(auth_role)
staff_role = Role(name='Staff', weight=2)
staff_role.permissions.append(RolePermission(permission='approve registrations'))
staff_role.permissions.append(RolePermission(permission='ban users'))
staff_role.permissions.append(RolePermission(permission='administer all communities'))
staff_role.permissions.append(RolePermission(permission='administer all users'))
db.session.add(staff_role)
admin_role = Role(name='Admin', weight=3)
admin_role.permissions.append(RolePermission(permission='approve registrations'))
admin_role.permissions.append(RolePermission(permission='change user roles'))
admin_role.permissions.append(RolePermission(permission='ban users'))
admin_role.permissions.append(RolePermission(permission='manage users'))
admin_role.permissions.append(RolePermission(permission='change instance settings'))
admin_role.permissions.append(RolePermission(permission='administer all communities'))
admin_role.permissions.append(RolePermission(permission='administer all users'))
db.session.add(admin_role)
# Admin user
user_name = input("Admin user name (ideally not 'admin'): ")
email = input("Admin email address: ")
password = input("Admin password: ")
verification_token = random_token(16)
admin_user = User(user_name=user_name, email=email, verification_token=verification_token)
admin_user.set_password(password)
admin_user.roles.append(admin_role)
admin_user.verified = True
db.session.add(admin_user)
db.session.commit()
print("Initial setup is finished.")
@app.cli.command('daily-maintenance')
def daily_maintenance():
with app.app_context():
"""Remove activity older than 3 days"""
db.session.query(ActivityPubLog).filter(
ActivityPubLog.created_at < utcnow() - timedelta(days=3)).delete()
db.session.commit()
@app.cli.command("spaceusage")
def spaceusage():
with app.app_context():
for user in User.query.all():
filesize = user.filesize()
num_content = user.num_content()
if filesize > 0 and num_content > 0:
print(f'{user.id},"{user.ap_id}",{filesize},{num_content}')
@app.cli.command("cleanupcovers")
def cleanupcovers():
with app.app_context():
for user in User.query.all():
if not user.is_local():
if user.cover_id:
file = user.cover
if file.file_path and file.thumbnail_path:
if os.path.exists(file.file_path):
os.unlink(file.file_path)
print(file.file_path)
file.file_path = ''
db.session.commit()
def list_files(directory):
for root, dirs, files in os.walk(directory):
for file in files:
yield os.path.join(root, file)
@app.cli.command("findorphanfiles")
def findorphanfiles():
with app.app_context():
for file_path in list_files('app/static/media/users'):
if 'thumbnail' in file_path:
f = File.query.filter(File.thumbnail_path == file_path).first()
else:
f = File.query.filter(File.file_path == file_path).first()
if f is None:
os.unlink(file_path)
def parse_communities(interests_source, segment):
lines = interests_source.split("\n")
include_in_output = False
output = []
for line in lines:
line = line.strip()
if line == segment:
include_in_output = True
continue
elif line == '':
include_in_output = False
if include_in_output:
output.append(line)
return "\n".join(output)