mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-24 03:43:42 -08:00
320 lines
16 KiB
Python
320 lines
16 KiB
Python
from collections import namedtuple
|
|
from datetime import timedelta, timezone
|
|
from random import randint
|
|
|
|
from feedgen.feed import FeedGenerator
|
|
from flask import request, flash, json, url_for, current_app, redirect, abort, make_response, g
|
|
from flask import render_template as flask_render_template
|
|
from flask_login import login_required, current_user
|
|
from flask_babel import _
|
|
from sqlalchemy import text, desc, or_
|
|
|
|
from app.activitypub.signature import post_request
|
|
from app.constants import SUBSCRIPTION_NONMEMBER, SUBSCRIPTION_OWNER, SUBSCRIPTION_MODERATOR, POST_TYPE_IMAGE, \
|
|
POST_TYPE_LINK, POST_TYPE_VIDEO, NOTIF_TOPIC
|
|
from app.inoculation import inoculation
|
|
from app.models import Topic, Community, Post, utcnow, CommunityMember, CommunityJoinRequest, User, \
|
|
NotificationSubscription, read_posts
|
|
from app.topic import bp
|
|
from app.email import send_email, send_topic_suggestion
|
|
from app import db, celery, cache
|
|
from app.topic.forms import ChooseTopicsForm, SuggestTopicsForm
|
|
from app.utils import render_template, user_filters_posts, moderating_communities, joined_communities, \
|
|
community_membership, blocked_domains, validation_required, mimetype_from_url, blocked_instances, \
|
|
communities_banned_from, blocked_users, menu_topics, blocked_communities
|
|
|
|
|
|
@bp.route('/topic/<path:topic_path>', methods=['GET'])
|
|
def show_topic(topic_path):
|
|
|
|
page = request.args.get('page', 1, type=int)
|
|
sort = request.args.get('sort', '' if current_user.is_anonymous else current_user.default_sort)
|
|
low_bandwidth = request.cookies.get('low_bandwidth', '0') == '1'
|
|
post_layout = request.args.get('layout', 'list' if not low_bandwidth else None)
|
|
|
|
# translate topic_name from /topic/fediverse to topic_id
|
|
topic_url_parts = topic_path.split('/')
|
|
last_topic_machine_name = topic_url_parts[-1]
|
|
breadcrumbs = []
|
|
existing_url = '/topic'
|
|
topic = None
|
|
for url_part in topic_url_parts:
|
|
topic = Topic.query.filter(Topic.machine_name == url_part.strip().lower()).first()
|
|
if topic:
|
|
breadcrumb = namedtuple("Breadcrumb", ['text', 'url'])
|
|
breadcrumb.text = topic.name
|
|
breadcrumb.url = f"{existing_url}/{topic.machine_name}" if topic.machine_name != last_topic_machine_name else ''
|
|
breadcrumbs.append(breadcrumb)
|
|
existing_url = breadcrumb.url
|
|
else:
|
|
abort(404)
|
|
current_topic = topic
|
|
|
|
if current_topic:
|
|
# get posts from communities in that topic
|
|
posts = Post.query.join(Community, Post.community_id == Community.id).filter(Community.topic_id == current_topic.id, Community.banned == False)
|
|
|
|
# filter out nsfw and nsfl if desired
|
|
if current_user.is_anonymous:
|
|
posts = posts.filter(Post.from_bot == False, Post.nsfw == False, Post.nsfl == False, Post.deleted == False)
|
|
content_filters = {}
|
|
else:
|
|
if current_user.ignore_bots == 1:
|
|
posts = posts.filter(Post.from_bot == False)
|
|
if current_user.hide_nsfl == 1:
|
|
posts = posts.filter(Post.nsfl == False)
|
|
if current_user.hide_nsfw == 1:
|
|
posts = posts.filter(Post.nsfw == False)
|
|
if current_user.hide_read_posts:
|
|
posts = posts.outerjoin(read_posts, (Post.id == read_posts.c.read_post_id) & (read_posts.c.user_id == current_user.id))
|
|
posts = posts.filter(read_posts.c.read_post_id.is_(None)) # Filter where there is no corresponding read post for the current user
|
|
posts = posts.filter(Post.deleted == False)
|
|
content_filters = user_filters_posts(current_user.id)
|
|
|
|
# filter blocked domains and instances
|
|
domains_ids = blocked_domains(current_user.id)
|
|
if domains_ids:
|
|
posts = posts.filter(or_(Post.domain_id.not_in(domains_ids), Post.domain_id == None))
|
|
instance_ids = blocked_instances(current_user.id)
|
|
if instance_ids:
|
|
posts = posts.filter(or_(Post.instance_id.not_in(instance_ids), Post.instance_id == None))
|
|
community_ids = blocked_communities(current_user.id)
|
|
if community_ids:
|
|
posts = posts.filter(Post.community_id.not_in(community_ids))
|
|
# filter blocked users
|
|
blocked_accounts = blocked_users(current_user.id)
|
|
if blocked_accounts:
|
|
posts = posts.filter(Post.user_id.not_in(blocked_accounts))
|
|
|
|
banned_from = communities_banned_from(current_user.id)
|
|
if banned_from:
|
|
posts = posts.filter(Post.community_id.not_in(banned_from))
|
|
|
|
# sorting
|
|
if sort == '' or sort == 'hot':
|
|
posts = posts.order_by(desc(Post.ranking)).order_by(desc(Post.posted_at))
|
|
elif sort == 'top':
|
|
posts = posts.filter(Post.posted_at > utcnow() - timedelta(days=7)).order_by(desc(Post.up_votes - Post.down_votes))
|
|
elif sort == 'new':
|
|
posts = posts.order_by(desc(Post.posted_at))
|
|
elif sort == 'active':
|
|
posts = posts.order_by(desc(Post.last_active))
|
|
|
|
# paging
|
|
per_page = 100
|
|
if post_layout == 'masonry':
|
|
per_page = 200
|
|
elif post_layout == 'masonry_wide':
|
|
per_page = 300
|
|
posts = posts.paginate(page=page, per_page=per_page, error_out=False)
|
|
|
|
topic_communities = Community.query.filter(Community.topic_id == current_topic.id, Community.banned == False).order_by(Community.name)
|
|
|
|
next_url = url_for('topic.show_topic',
|
|
topic_path=topic_path,
|
|
page=posts.next_num, sort=sort, layout=post_layout) if posts.has_next else None
|
|
prev_url = url_for('topic.show_topic',
|
|
topic_path=topic_path,
|
|
page=posts.prev_num, sort=sort, layout=post_layout) if posts.has_prev and page != 1 else None
|
|
|
|
sub_topics = Topic.query.filter_by(parent_id=current_topic.id).order_by(Topic.name).all()
|
|
|
|
return render_template('topic/show_topic.html', title=_(current_topic.name), posts=posts, topic=current_topic, sort=sort,
|
|
page=page, post_layout=post_layout, next_url=next_url, prev_url=prev_url,
|
|
topic_communities=topic_communities, content_filters=content_filters,
|
|
sub_topics=sub_topics, topic_path=topic_path, breadcrumbs=breadcrumbs,
|
|
rss_feed=f"https://{current_app.config['SERVER_NAME']}/topic/{topic_path}.rss",
|
|
rss_feed_name=f"{current_topic.name} on {g.site.name}",
|
|
show_post_community=True, moderating_communities=moderating_communities(current_user.get_id()),
|
|
joined_communities=joined_communities(current_user.get_id()),
|
|
menu_topics=menu_topics(),
|
|
inoculation=inoculation[randint(0, len(inoculation) - 1)] if g.site.show_inoculation_block else None,
|
|
POST_TYPE_LINK=POST_TYPE_LINK, POST_TYPE_IMAGE=POST_TYPE_IMAGE,
|
|
POST_TYPE_VIDEO=POST_TYPE_VIDEO,
|
|
SUBSCRIPTION_OWNER=SUBSCRIPTION_OWNER, SUBSCRIPTION_MODERATOR=SUBSCRIPTION_MODERATOR,
|
|
)
|
|
else:
|
|
abort(404)
|
|
|
|
|
|
@bp.route('/topic/<path:topic_path>.rss', methods=['GET'])
|
|
@cache.cached(timeout=600)
|
|
def show_topic_rss(topic_path):
|
|
topic_url_parts = topic_path.split('/')
|
|
last_topic_machine_name = topic_url_parts[-1]
|
|
topic = Topic.query.filter(Topic.machine_name == last_topic_machine_name.strip().lower()).first()
|
|
|
|
if topic:
|
|
posts = Post.query.join(Community, Post.community_id == Community.id).filter(Community.topic_id == topic.id,
|
|
Community.banned == False)
|
|
posts = posts.filter(Post.from_bot == False, Post.nsfw == False, Post.nsfl == False, Post.deleted == False)
|
|
posts = posts.order_by(desc(Post.created_at)).limit(100).all()
|
|
|
|
fg = FeedGenerator()
|
|
fg.id(f"https://{current_app.config['SERVER_NAME']}/topic/{last_topic_machine_name}")
|
|
fg.title(f'{topic.name} on {g.site.name}')
|
|
fg.link(href=f"https://{current_app.config['SERVER_NAME']}/topic/{last_topic_machine_name}", rel='alternate')
|
|
fg.logo(f"https://{current_app.config['SERVER_NAME']}/static/images/apple-touch-icon.png")
|
|
fg.subtitle(' ')
|
|
fg.link(href=f"https://{current_app.config['SERVER_NAME']}/topic/{last_topic_machine_name}.rss", rel='self')
|
|
fg.language('en')
|
|
|
|
for post in posts:
|
|
fe = fg.add_entry()
|
|
fe.title(post.title)
|
|
fe.link(href=f"https://{current_app.config['SERVER_NAME']}/post/{post.id}")
|
|
if post.url:
|
|
type = mimetype_from_url(post.url)
|
|
if type and not type.startswith('text/'):
|
|
fe.enclosure(post.url, type=type)
|
|
fe.description(post.body_html)
|
|
fe.guid(post.profile_id(), permalink=True)
|
|
fe.author(name=post.author.user_name)
|
|
fe.pubDate(post.created_at.replace(tzinfo=timezone.utc))
|
|
|
|
response = make_response(fg.rss_str())
|
|
response.headers.set('Content-Type', 'application/rss+xml')
|
|
response.headers.add_header('ETag', f"{topic.id}_{hash(g.site.last_active)}")
|
|
response.headers.add_header('Cache-Control', 'no-cache, max-age=600, must-revalidate')
|
|
return response
|
|
else:
|
|
abort(404)
|
|
|
|
|
|
@bp.route('/choose_topics', methods=['GET', 'POST'])
|
|
@login_required
|
|
def choose_topics():
|
|
form = ChooseTopicsForm()
|
|
form.chosen_topics.choices = topics_for_form()
|
|
if form.validate_on_submit():
|
|
if form.chosen_topics.data:
|
|
for topic_id in form.chosen_topics.data:
|
|
join_topic(topic_id)
|
|
flash(_('You have joined some communities relating to those interests. Find them on the Topics menu or browse the home page.'))
|
|
cache.delete_memoized(joined_communities, current_user.id)
|
|
return redirect(url_for('main.index'))
|
|
else:
|
|
flash(_('You did not choose any topics. Would you like to choose individual communities instead?'))
|
|
return redirect(url_for('main.list_communities'))
|
|
else:
|
|
return render_template('topic/choose_topics.html', form=form,
|
|
moderating_communities=moderating_communities(current_user.get_id()),
|
|
joined_communities=joined_communities(current_user.get_id()),
|
|
menu_topics=menu_topics(), site=g.site,
|
|
)
|
|
|
|
|
|
@bp.route('/topic/<topic_name>/submit', methods=['GET', 'POST'])
|
|
@login_required
|
|
@validation_required
|
|
def topic_create_post(topic_name):
|
|
topic = Topic.query.filter(Topic.machine_name == topic_name.strip().lower()).first()
|
|
if not topic:
|
|
abort(404)
|
|
communities = Community.query.filter_by(topic_id=topic.id, banned=False).order_by(Community.title).all()
|
|
child_topics = [topic.id for topic in Topic.query.filter(Topic.parent_id == topic.id).all()]
|
|
sub_communities = Community.query.filter_by(banned=False).filter(Community.topic_id.in_(child_topics)).order_by(Community.title).all()
|
|
if request.form.get('community_id', '') != '':
|
|
community = Community.query.get_or_404(int(request.form.get('community_id')))
|
|
return redirect(url_for('community.join_then_add', actor=community.link()))
|
|
return render_template('topic/topic_create_post.html', communities=communities, sub_communities=sub_communities,
|
|
topic=topic,
|
|
moderating_communities=moderating_communities(current_user.get_id()),
|
|
joined_communities=joined_communities(current_user.get_id()),
|
|
menu_topics=menu_topics(),
|
|
SUBSCRIPTION_OWNER=SUBSCRIPTION_OWNER, SUBSCRIPTION_MODERATOR=SUBSCRIPTION_MODERATOR)
|
|
|
|
|
|
@bp.route('/topic/<int:topic_id>/notification', methods=['GET', 'POST'])
|
|
@login_required
|
|
def topic_notification(topic_id: int):
|
|
# Toggle whether the current user is subscribed to notifications about this community's posts or not
|
|
topic = Topic.query.get_or_404(topic_id)
|
|
existing_notification = NotificationSubscription.query.filter(NotificationSubscription.entity_id == topic.id,
|
|
NotificationSubscription.user_id == current_user.id,
|
|
NotificationSubscription.type == NOTIF_TOPIC).first()
|
|
if existing_notification:
|
|
db.session.delete(existing_notification)
|
|
db.session.commit()
|
|
else: # no subscription yet, so make one
|
|
new_notification = NotificationSubscription(name=topic.name, user_id=current_user.id, entity_id=topic.id,
|
|
type=NOTIF_TOPIC)
|
|
db.session.add(new_notification)
|
|
db.session.commit()
|
|
|
|
return render_template('topic/_notification_toggle.html', topic=topic)
|
|
|
|
|
|
@bp.route('/topics/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
def suggest_topics():
|
|
form = SuggestTopicsForm()
|
|
if not current_user.trustworthy():
|
|
return redirect(url_for('topic.suggestion_denied'))
|
|
if form.validate_on_submit():
|
|
subject = f'New topic suggestion from {g.site.name}'
|
|
recipients = g.site.contact_email
|
|
topic_name = form.topic_name.data
|
|
communities_for_topic = form.communities_for_topic.data
|
|
send_topic_suggestion(communities_for_topic, current_user, recipients, subject, topic_name)
|
|
flash(_(f'Thank you for the topic suggestion, it has been sent to the site administrator(s).'))
|
|
return redirect(url_for('main.list_topics'))
|
|
else:
|
|
return render_template('topic/suggest_topics.html', form=form, title=_('Suggest a topic"'),
|
|
moderating_communities=moderating_communities(current_user.get_id()),
|
|
joined_communities=joined_communities(current_user.get_id()),
|
|
menu_topics=menu_topics(),
|
|
site=g.site)
|
|
|
|
|
|
@bp.route('/topic/suggestion-denied', methods=['GET'])
|
|
@login_required
|
|
def suggestion_denied():
|
|
return render_template('topic/suggestion_denied.html')
|
|
|
|
|
|
def topics_for_form():
|
|
topics = Topic.query.filter_by(parent_id=None).order_by(Topic.name).all()
|
|
result = []
|
|
for topic in topics:
|
|
result.append((topic.id, topic.name))
|
|
sub_topics = Topic.query.filter_by(parent_id=topic.id).order_by(Topic.name).all()
|
|
for sub_topic in sub_topics:
|
|
result.append((sub_topic.id, ' --- ' + sub_topic.name))
|
|
return result
|
|
|
|
|
|
def join_topic(topic_id):
|
|
communities = Community.query.filter_by(topic_id=topic_id, banned=False).all()
|
|
for community in communities:
|
|
if not community.user_is_banned(current_user) and community_membership(current_user, community) == SUBSCRIPTION_NONMEMBER:
|
|
if not community.is_local():
|
|
join_request = CommunityJoinRequest(user_id=current_user.id, community_id=community.id)
|
|
db.session.add(join_request)
|
|
db.session.commit()
|
|
if current_app.debug:
|
|
send_community_follow(community.id, join_request, current_user.id)
|
|
else:
|
|
send_community_follow.delay(community.id, join_request.id, current_user.id)
|
|
|
|
member = CommunityMember(user_id=current_user.id, community_id=community.id)
|
|
db.session.add(member)
|
|
db.session.commit()
|
|
cache.delete_memoized(community_membership, current_user, community)
|
|
|
|
|
|
@celery.task
|
|
def send_community_follow(community_id, join_request_id, user_id):
|
|
with current_app.app_context():
|
|
user = User.query.get(user_id)
|
|
community = Community.query.get(community_id)
|
|
if not community.instance.gone_forever:
|
|
follow = {
|
|
"actor": user.public_url(),
|
|
"to": [community.public_url()],
|
|
"object": community.public_url(),
|
|
"type": "Follow",
|
|
"id": f"https://{current_app.config['SERVER_NAME']}/activities/follow/{join_request_id}"
|
|
}
|
|
post_request(community.ap_inbox_url, follow, user.private_key,
|
|
user.public_url() + '#main-key')
|