mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-23 19:36:56 -08:00
Merge remote-tracking branch 'origin/main'
This commit is contained in:
commit
da66219ab8
4 changed files with 225 additions and 55 deletions
|
@ -25,7 +25,7 @@ from app.activitypub.util import public_key, users_total, active_half_year, acti
|
|||
user_removed_from_remote_server, create_post, create_post_reply, update_post_reply_from_activity, \
|
||||
update_post_from_activity, undo_vote, undo_downvote, post_to_page, get_redis_connection, find_reported_object, \
|
||||
process_report, ensure_domains_match, can_edit, can_delete, remove_data_from_banned_user, resolve_remote_post, \
|
||||
inform_followers_of_post_update, comment_model_to_json, restore_post_or_comment
|
||||
inform_followers_of_post_update, comment_model_to_json, restore_post_or_comment, ban_local_user, unban_local_user
|
||||
from app.utils import gibberish, get_setting, is_image_url, allowlist_html, render_template, \
|
||||
domain_from_url, markdown_to_html, community_membership, ap_datetime, ip_address, can_downvote, \
|
||||
can_upvote, can_create_post, awaken_dormant_instance, shorten_string, can_create_post_reply, sha256_digest, \
|
||||
|
@ -856,6 +856,14 @@ def process_inbox_request(request_json, activitypublog_id, ip_address):
|
|||
if 'object' in request_json and 'object' in request_json['object']:
|
||||
restore_post_or_comment(request_json['object']['object'])
|
||||
activity_log.result = 'success'
|
||||
elif request_json['object']['object']['type'] == 'Block':
|
||||
activity_log.activity_type = 'Undo User Ban'
|
||||
deletor_ap_id = request_json['object']['object']['actor']
|
||||
user_ap_id = request_json['object']['object']['object']
|
||||
target = request_json['object']['object']['target']
|
||||
if target == request_json['actor'] and user_ap_id.startswith('https://' + current_app.config['SERVER_NAME']):
|
||||
unban_local_user(deletor_ap_id, user_ap_id, target)
|
||||
activity_log.result = 'success'
|
||||
elif request_json['object']['type'] == 'Add' and 'target' in request_json['object']:
|
||||
activity_log.activity_type = request_json['object']['type']
|
||||
target = request_json['object']['target']
|
||||
|
@ -899,13 +907,16 @@ def process_inbox_request(request_json, activitypublog_id, ip_address):
|
|||
existing_membership.is_moderator = False
|
||||
activity_log.result = 'success'
|
||||
elif request_json['object']['type'] == 'Block' and 'target' in request_json['object']:
|
||||
activity_log.activity_type = 'Community Ban'
|
||||
mod_ap_id = request_json['object']['actor']
|
||||
activity_log.activity_type = 'User Ban'
|
||||
deletor_ap_id = request_json['object']['actor']
|
||||
user_ap_id = request_json['object']['object']
|
||||
target = request_json['object']['target']
|
||||
remove_data = request_json['object']['removeData']
|
||||
if target == request_json['actor'] and remove_data == True:
|
||||
remove_data_from_banned_user(mod_ap_id, user_ap_id, target)
|
||||
if target == request_json['actor']:
|
||||
if remove_data == True:
|
||||
remove_data_from_banned_user(deletor_ap_id, user_ap_id, target)
|
||||
if user_ap_id.startswith('https://' + current_app.config['SERVER_NAME']):
|
||||
ban_local_user(deletor_ap_id, user_ap_id, target, request_json['object'])
|
||||
activity_log.result = 'success'
|
||||
else:
|
||||
activity_log.exception_message = 'Invalid type for Announce'
|
||||
|
@ -1050,6 +1061,14 @@ def process_inbox_request(request_json, activitypublog_id, ip_address):
|
|||
if post_or_comment:
|
||||
announce_activity_to_followers(post_or_comment.community, user, request_json)
|
||||
activity_log.result = 'success'
|
||||
elif request_json['object']['type'] == 'Block': # Undoing a ban
|
||||
activity_log.activity_type = 'Undo User Ban'
|
||||
deletor_ap_id = request_json['object']['actor']
|
||||
user_ap_id = request_json['object']['object']
|
||||
target = request_json['object']['target']
|
||||
if user_ap_id.startswith('https://' + current_app.config['SERVER_NAME']):
|
||||
unban_local_user(deletor_ap_id, user_ap_id, target)
|
||||
activity_log.result = 'success'
|
||||
elif request_json['type'] == 'Delete':
|
||||
if isinstance(request_json['object'], str):
|
||||
ap_id = request_json['object'] # lemmy
|
||||
|
@ -1173,13 +1192,15 @@ def process_inbox_request(request_json, activitypublog_id, ip_address):
|
|||
else:
|
||||
activity_log.exception_message = 'Report ignored due to missing user or content'
|
||||
elif request_json['type'] == 'Block':
|
||||
activity_log.activity_type = 'Site Ban'
|
||||
admin_ap_id = request_json['actor']
|
||||
activity_log.activity_type = 'User Ban'
|
||||
deletor_ap_id = request_json['actor']
|
||||
user_ap_id = request_json['object']
|
||||
target = request_json['target']
|
||||
remove_data = request_json['removeData']
|
||||
if remove_data == True:
|
||||
remove_data_from_banned_user(admin_ap_id, user_ap_id, target)
|
||||
remove_data_from_banned_user(deletor_ap_id, user_ap_id, target)
|
||||
if user_ap_id.startswith('https://' + current_app.config['SERVER_NAME']):
|
||||
ban_local_user(deletor_ap_id, user_ap_id, target, request_json)
|
||||
activity_log.result = 'success'
|
||||
|
||||
# Flush the caches of any major object that was created. To be sure.
|
||||
|
|
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
|||
|
||||
import html
|
||||
import os
|
||||
from datetime import timedelta
|
||||
from datetime import timedelta, datetime, timezone
|
||||
from random import randint
|
||||
from typing import Union, Tuple, List
|
||||
|
||||
|
@ -14,7 +14,7 @@ from sqlalchemy import text, func, desc
|
|||
from app import db, cache, constants, celery
|
||||
from app.models import User, Post, Community, BannedInstances, File, PostReply, AllowedInstances, Instance, utcnow, \
|
||||
PostVote, PostReplyVote, ActivityPubLog, Notification, Site, CommunityMember, InstanceRole, Report, Conversation, \
|
||||
Language, Tag, Poll, PollChoice, UserFollower
|
||||
Language, Tag, Poll, PollChoice, UserFollower, CommunityBan, CommunityJoinRequest, NotificationSubscription
|
||||
from app.activitypub.signature import signed_get_request, post_request
|
||||
import time
|
||||
import base64
|
||||
|
@ -32,7 +32,8 @@ from app.utils import get_request, allowlist_html, get_setting, ap_datetime, mar
|
|||
shorten_string, reply_already_exists, reply_is_just_link_to_gif_reaction, confidence, remove_tracking_from_link, \
|
||||
blocked_phrases, microblog_content_to_title, generate_image_from_video_url, is_video_url, reply_is_stupid, \
|
||||
notification_subscribers, communities_banned_from, lemmy_markdown_to_html, actor_contains_blocked_words, \
|
||||
html_to_text, opengraph_parse, url_to_thumbnail_file, add_to_modlog_activitypub
|
||||
html_to_text, opengraph_parse, url_to_thumbnail_file, add_to_modlog_activitypub, joined_communities, \
|
||||
moderating_communities
|
||||
|
||||
from sqlalchemy import or_
|
||||
|
||||
|
@ -845,14 +846,13 @@ def post_json_to_model(activity_log, post_json, user, community) -> Post:
|
|||
instance_id=user.instance_id,
|
||||
indexable = user.indexable
|
||||
)
|
||||
if 'source' in post_json and \
|
||||
post_json['source']['mediaType'] == 'text/markdown':
|
||||
post.body = post_json['source']['content']
|
||||
post.body_html = lemmy_markdown_to_html(post.body)
|
||||
elif 'content' in post_json:
|
||||
if 'content' in post_json:
|
||||
if post_json['mediaType'] == 'text/html':
|
||||
post.body_html = allowlist_html(post_json['content'])
|
||||
post.body = html_to_text(post.body_html)
|
||||
if 'source' in post_json and post_json['source']['mediaType'] == 'text/markdown':
|
||||
post.body = post_json['source']['content']
|
||||
else:
|
||||
post.body = html_to_text(post.body_html)
|
||||
elif post_json['mediaType'] == 'text/markdown':
|
||||
post.body = post_json['content']
|
||||
post.body_html = markdown_to_html(post.body)
|
||||
|
@ -1555,6 +1555,134 @@ def remove_data_from_banned_user_task(deletor_ap_id, user_ap_id, target):
|
|||
db.session.commit()
|
||||
|
||||
|
||||
def ban_local_user(deletor_ap_id, user_ap_id, target, request_json):
|
||||
if current_app.debug:
|
||||
ban_local_user_task(deletor_ap_id, user_ap_id, target, request_json)
|
||||
else:
|
||||
ban_local_user_task.delay(deletor_ap_id, user_ap_id, target, request_json)
|
||||
|
||||
|
||||
@celery.task
|
||||
def ban_local_user_task(deletor_ap_id, user_ap_id, target, request_json):
|
||||
# same info in 'Block' and 'Announce/Block' can be sent at same time, and both call this function
|
||||
ban_in_progress = cache.get(f'{deletor_ap_id} is banning {user_ap_id} from {target}')
|
||||
if not ban_in_progress:
|
||||
cache.set(f'{deletor_ap_id} is banning {user_ap_id} from {target}', True, timeout=60)
|
||||
else:
|
||||
return
|
||||
|
||||
deletor = find_actor_or_create(deletor_ap_id, create_if_not_found=False)
|
||||
user = find_actor_or_create(user_ap_id, create_if_not_found=False)
|
||||
community = Community.query.filter_by(ap_profile_id=target).first()
|
||||
|
||||
if not deletor or not user:
|
||||
return
|
||||
|
||||
# site bans by admins
|
||||
if deletor.instance.user_is_admin(deletor.id) and target == f"https://{deletor.instance.domain}/":
|
||||
# need instance_ban table?
|
||||
...
|
||||
|
||||
# community bans by mods or admins
|
||||
elif community and (community.is_moderator(deletor) or community.is_instance_admin(deletor)):
|
||||
existing = CommunityBan.query.filter_by(community_id=community.id, user_id=user.id).first()
|
||||
|
||||
if not existing:
|
||||
new_ban = CommunityBan(community_id=community.id, user_id=user.id, banned_by=deletor.id)
|
||||
if 'summary' in request_json:
|
||||
new_ban.reason=request_json['summary']
|
||||
|
||||
if 'expires' in request_json and datetime.fromisoformat(request_json['expires']) > datetime.now(timezone.utc):
|
||||
new_ban.ban_until = datetime.fromisoformat(request_json['expires'])
|
||||
elif 'endTime' in request_json and datetime.fromisoformat(request_json['endTime']) > datetime.now(timezone.utc):
|
||||
new_ban.ban_until = datetime.fromisoformat(request_json['endTime'])
|
||||
|
||||
db.session.add(new_ban)
|
||||
db.session.commit()
|
||||
|
||||
db.session.query(CommunityJoinRequest).filter(CommunityJoinRequest.community_id == community.id, CommunityJoinRequest.user_id == user.id).delete()
|
||||
|
||||
community_membership_record = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first()
|
||||
if community_membership_record:
|
||||
community_membership_record.is_banned = True
|
||||
|
||||
cache.delete_memoized(communities_banned_from, user.id)
|
||||
cache.delete_memoized(joined_communities, user.id)
|
||||
cache.delete_memoized(moderating_communities, user.id)
|
||||
|
||||
# Notify banned person
|
||||
notify = Notification(title=shorten_string('You have been banned from ' + community.title),
|
||||
url=f'/notifications', user_id=user.id,
|
||||
author_id=deletor.id)
|
||||
db.session.add(notify)
|
||||
if not current_app.debug: # user.unread_notifications += 1 hangs app if 'user' is the same person
|
||||
user.unread_notifications += 1 # who pressed 'Re-submit this activity'.
|
||||
db.session.commit()
|
||||
|
||||
# Remove their notification subscription, if any
|
||||
db.session.query(NotificationSubscription).filter(NotificationSubscription.entity_id == community.id,
|
||||
NotificationSubscription.user_id == user.id,
|
||||
NotificationSubscription.type == NOTIF_COMMUNITY).delete()
|
||||
|
||||
add_to_modlog_activitypub('ban_user', deletor, community_id=community.id, link_text=user.display_name(), link=user.link())
|
||||
|
||||
|
||||
def unban_local_user(deletor_ap_id, user_ap_id, target):
|
||||
if current_app.debug:
|
||||
unban_local_user_task(deletor_ap_id, user_ap_id, target)
|
||||
else:
|
||||
unban_local_user_task.delay(deletor_ap_id, user_ap_id, target)
|
||||
|
||||
|
||||
@celery.task
|
||||
def unban_local_user_task(deletor_ap_id, user_ap_id, target):
|
||||
# same info in 'Block' and 'Announce/Block' can be sent at same time, and both call this function
|
||||
unban_in_progress = cache.get(f'{deletor_ap_id} is undoing ban of {user_ap_id} from {target}')
|
||||
if not unban_in_progress:
|
||||
cache.set(f'{deletor_ap_id} is undoing ban of {user_ap_id} from {target}', True, timeout=60)
|
||||
else:
|
||||
return
|
||||
|
||||
deletor = find_actor_or_create(deletor_ap_id, create_if_not_found=False)
|
||||
user = find_actor_or_create(user_ap_id, create_if_not_found=False)
|
||||
community = Community.query.filter_by(ap_profile_id=target).first()
|
||||
|
||||
if not deletor or not user:
|
||||
return
|
||||
|
||||
# site undo bans by admins
|
||||
if deletor.instance.user_is_admin(deletor.id) and target == f"https://{deletor.instance.domain}/":
|
||||
# need instance_ban table?
|
||||
...
|
||||
|
||||
# community undo bans by mods or admins
|
||||
elif community and (community.is_moderator(deletor) or community.is_instance_admin(deletor)):
|
||||
existing_ban = CommunityBan.query.filter_by(community_id=community.id, user_id=user.id).first()
|
||||
if existing_ban:
|
||||
db.session.delete(existing_ban)
|
||||
db.session.commit()
|
||||
|
||||
community_membership_record = CommunityMember.query.filter_by(community_id=community.id, user_id=user.id).first()
|
||||
if community_membership_record:
|
||||
community_membership_record.is_banned = False
|
||||
db.session.commit()
|
||||
|
||||
cache.delete_memoized(communities_banned_from, user.id)
|
||||
cache.delete_memoized(joined_communities, user.id)
|
||||
cache.delete_memoized(moderating_communities, user.id)
|
||||
|
||||
# Notify previously banned person
|
||||
notify = Notification(title=shorten_string('You have been un-banned from ' + community.title),
|
||||
url=f'/notifications', user_id=user.id,
|
||||
author_id=deletor.id)
|
||||
db.session.add(notify)
|
||||
if not current_app.debug: # user.unread_notifications += 1 hangs app if 'user' is the same person
|
||||
user.unread_notifications += 1 # who pressed 'Re-submit this activity'.
|
||||
db.session.commit()
|
||||
|
||||
add_to_modlog_activitypub('unban_user', deletor, community_id=community.id, link_text=user.display_name(), link=user.link())
|
||||
|
||||
|
||||
def create_post_reply(activity_log: ActivityPubLog, community: Community, in_reply_to, request_json: dict, user: User, announce_id=None) -> Union[PostReply, None]:
|
||||
if community.local_only:
|
||||
activity_log.exception_message = 'Community is local only, reply discarded'
|
||||
|
@ -1582,17 +1710,15 @@ def create_post_reply(activity_log: ActivityPubLog, community: Community, in_rep
|
|||
ap_create_id=request_json['id'],
|
||||
ap_announce_id=announce_id,
|
||||
instance_id=user.instance_id)
|
||||
# Get comment content. Lemmy puts this in unusual place.
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and \
|
||||
'mediaType' in request_json['object']['source'] and \
|
||||
request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
post_reply.body = request_json['object']['source']['content']
|
||||
post_reply.body_html = lemmy_markdown_to_html(post_reply.body)
|
||||
elif 'content' in request_json['object']: # Kbin, Mastodon, etc provide their posts as html
|
||||
if 'content' in request_json['object']: # Kbin, Mastodon, etc provide their posts as html
|
||||
if not request_json['object']['content'].startswith('<p>') or not request_json['object']['content'].startswith('<blockquote>'):
|
||||
request_json['object']['content'] = '<p>' + request_json['object']['content'] + '</p>'
|
||||
post_reply.body_html = allowlist_html(request_json['object']['content'])
|
||||
post_reply.body = html_to_text(post_reply.body_html)
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and \
|
||||
'mediaType' in request_json['object']['source'] and request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
post_reply.body = request_json['object']['source']['content']
|
||||
else:
|
||||
post_reply.body = html_to_text(post_reply.body_html)
|
||||
# Language - Lemmy uses 'language' while Mastodon uses 'contentMap'
|
||||
if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict):
|
||||
language = find_language_or_create(request_json['object']['language']['identifier'],
|
||||
|
@ -1714,18 +1840,19 @@ def create_post(activity_log: ActivityPubLog, community: Community, request_json
|
|||
indexable=user.indexable,
|
||||
microblog=microblog
|
||||
)
|
||||
# Get post content. Lemmy and Kbin put this in different places.
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and request_json['object']['source']['mediaType'] == 'text/markdown': # Lemmy
|
||||
post.body = request_json['object']['source']['content']
|
||||
post.body_html = lemmy_markdown_to_html(post.body)
|
||||
elif 'content' in request_json['object'] and request_json['object']['content'] is not None: # Kbin
|
||||
if 'content' in request_json['object'] and request_json['object']['content'] is not None:
|
||||
if 'mediaType' in request_json['object'] and request_json['object']['mediaType'] == 'text/html':
|
||||
post.body_html = allowlist_html(request_json['object']['content'])
|
||||
post.body = html_to_text(post.body_html)
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
post.body = request_json['object']['source']['content']
|
||||
else:
|
||||
post.body = html_to_text(post.body_html)
|
||||
elif 'mediaType' in request_json['object'] and request_json['object']['mediaType'] == 'text/markdown':
|
||||
post.body = request_json['object']['content']
|
||||
post.body_html = markdown_to_html(post.body)
|
||||
else:
|
||||
if not request_json['object']['content'].startswith('<p>') or not request_json['object']['content'].startswith('<blockquote>'):
|
||||
request_json['object']['content'] = '<p>' + request_json['object']['content'] + '</p>'
|
||||
post.body_html = allowlist_html(request_json['object']['content'])
|
||||
post.body = html_to_text(post.body_html)
|
||||
if microblog:
|
||||
|
@ -1941,14 +2068,15 @@ def notify_about_post_reply(parent_reply: Union[PostReply, None], new_reply: Pos
|
|||
|
||||
|
||||
def update_post_reply_from_activity(reply: PostReply, request_json: dict):
|
||||
if 'source' in request_json['object'] and \
|
||||
isinstance(request_json['object']['source'], dict) and \
|
||||
request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
reply.body = request_json['object']['source']['content']
|
||||
reply.body_html = lemmy_markdown_to_html(reply.body)
|
||||
elif 'content' in request_json['object']:
|
||||
if 'content' in request_json['object']: # Kbin, Mastodon, etc provide their posts as html
|
||||
if not request_json['object']['content'].startswith('<p>') or not request_json['object']['content'].startswith('<blockquote>'):
|
||||
request_json['object']['content'] = '<p>' + request_json['object']['content'] + '</p>'
|
||||
reply.body_html = allowlist_html(request_json['object']['content'])
|
||||
reply.body = ''
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and \
|
||||
'mediaType' in request_json['object']['source'] and request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
reply.body = request_json['object']['source']['content']
|
||||
else:
|
||||
reply.body = html_to_text(post_reply.body_html)
|
||||
# Language
|
||||
if 'language' in request_json['object'] and isinstance(request_json['object']['language'], dict):
|
||||
language = find_language_or_create(request_json['object']['language']['identifier'], request_json['object']['language']['name'])
|
||||
|
@ -1965,19 +2093,19 @@ def update_post_from_activity(post: Post, request_json: dict):
|
|||
|
||||
nsfl_in_title = '[NSFL]' in name.upper() or '(NSFL)' in name.upper()
|
||||
post.title = name
|
||||
if 'source' in request_json['object'] and \
|
||||
isinstance(request_json['object']['source'], dict) and \
|
||||
request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
post.body = request_json['object']['source']['content']
|
||||
post.body_html = lemmy_markdown_to_html(post.body)
|
||||
elif 'content' in request_json['object'] and request_json['object']['content'] is not None: # Kbin
|
||||
if 'content' in request_json['object'] and request_json['object']['content'] is not None:
|
||||
if 'mediaType' in request_json['object'] and request_json['object']['mediaType'] == 'text/html':
|
||||
post.body_html = allowlist_html(request_json['object']['content'])
|
||||
post.body = html_to_text(post.body_html)
|
||||
if 'source' in request_json['object'] and isinstance(request_json['object']['source'], dict) and request_json['object']['source']['mediaType'] == 'text/markdown':
|
||||
post.body = request_json['object']['source']['content']
|
||||
else:
|
||||
post.body = html_to_text(post.body_html)
|
||||
elif 'mediaType' in request_json['object'] and request_json['object']['mediaType'] == 'text/markdown':
|
||||
post.body = request_json['object']['content']
|
||||
post.body_html = markdown_to_html(post.body)
|
||||
else:
|
||||
if not request_json['object']['content'].startswith('<p>') or not request_json['object']['content'].startswith('<blockquote>'):
|
||||
request_json['object']['content'] = '<p>' + request_json['object']['content'] + '</p>'
|
||||
post.body_html = allowlist_html(request_json['object']['content'])
|
||||
post.body = html_to_text(post.body_html)
|
||||
if name == "[Microblog]":
|
||||
|
|
|
@ -1008,9 +1008,9 @@ def federate_post_update(post):
|
|||
page_json = {
|
||||
'type': 'Page',
|
||||
'id': post.ap_id,
|
||||
'attributedTo': current_user.ap_profile_id,
|
||||
'attributedTo': current_user.public_url(),
|
||||
'to': [
|
||||
post.community.ap_profile_id,
|
||||
post.community.public_url(),
|
||||
'https://www.w3.org/ns/activitystreams#Public'
|
||||
],
|
||||
'name': post.title,
|
||||
|
@ -1024,7 +1024,7 @@ def federate_post_update(post):
|
|||
'stickied': post.sticky,
|
||||
'published': ap_datetime(post.posted_at),
|
||||
'updated': ap_datetime(post.edited_at),
|
||||
'audience': post.community.ap_profile_id,
|
||||
'audience': post.community.public_url(),
|
||||
'language': {
|
||||
'identifier': post.language_code(),
|
||||
'name': post.language_name()
|
||||
|
|
35
app/utils.py
35
app/utils.py
|
@ -270,9 +270,29 @@ def allowlist_html(html: str, a_target='_blank') -> str:
|
|||
if tag.name == 'table':
|
||||
tag.attrs['class'] = 'table'
|
||||
|
||||
clean_html = str(soup)
|
||||
|
||||
# avoid returning empty anchors
|
||||
re_empty_anchor = re.compile(r'<a href="(.*?)" rel="nofollow ugc" target="_blank"><\/a>')
|
||||
return re_empty_anchor.sub(r'<a href="\1" rel="nofollow ugc" target="_blank">\1</a>', str(soup))
|
||||
clean_html = re_empty_anchor.sub(r'<a href="\1" rel="nofollow ugc" target="_blank">\1</a>', clean_html)
|
||||
|
||||
# replace lemmy's spoiler markdown left in HTML
|
||||
re_spoiler = re.compile(r':{3}\s*?spoiler\s+?(\S.+?)(?:\n|</p>)(.+?)(?:\n|<p>):{3}', re.S)
|
||||
clean_html = re_spoiler.sub(r'<details><summary>\1</summary><p>\2</p></details>', clean_html)
|
||||
|
||||
# replace strikethough markdown left in HTML
|
||||
re_strikethough = re.compile(r'~~(.*)~~')
|
||||
clean_html = re_strikethough.sub(r'<s>\1</s>', clean_html)
|
||||
|
||||
# replace subscript markdown left in HTML
|
||||
re_subscript = re.compile(r'~(.*)~')
|
||||
clean_html = re_subscript.sub(r'<sub>\1</sub>', clean_html)
|
||||
|
||||
# replace superscript markdown left in HTML
|
||||
re_superscript = re.compile(r'\^(.*)\^')
|
||||
clean_html = re_superscript.sub(r'<sup>\1</sup>', clean_html)
|
||||
|
||||
return clean_html
|
||||
|
||||
|
||||
# this is for pyfedi's version of Markdown (differs from lemmy for: newlines for soft breaks, ...)
|
||||
|
@ -280,23 +300,24 @@ def markdown_to_html(markdown_text, anchors_new_tab=True) -> str:
|
|||
if markdown_text:
|
||||
raw_html = markdown2.markdown(markdown_text, safe_mode=True,
|
||||
extras={'middle-word-em': False, 'tables': True, 'fenced-code-blocks': True, 'strike': True, 'breaks': {'on_newline': True, 'on_backslash': True}})
|
||||
# support lemmy's spoiler format
|
||||
re_spoiler = re.compile(r':{3}\s*?spoiler\s+?(\S.+?)(?:\n|</p>)(.+?)(?:\n|<p>):{3}', re.S)
|
||||
raw_html = re_spoiler.sub(r'<details><summary>\1</summary><p>\2</p></details>', raw_html)
|
||||
return allowlist_html(raw_html, a_target='_blank' if anchors_new_tab else '')
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
# Have started process of replacing this function, and just using Lemmy's HTML 'content' field, same as other platforms that only provide that.
|
||||
# Lemmy's MD supports line breaks as SPACE-SPACE-NEWLINE or SPACE-BACKSLASH-NEWLINE but Markdown2 can't support both: without the 'breaks'
|
||||
# extra, it doesn't translate SPACE-BACKSLASH-NEWLINE to <br />, but with it it doesn't translate SPACE-SPACE-NEWLINE to <br />
|
||||
|
||||
# done so far: post bodies (backfilled), post bodies (create), post bodies (edit), replies (create), replies (edit)
|
||||
# not done yet: user profiles, community descriptions, chat messages, over-writing with 'banned' or 'deleted by author', replies from autotl;dr bot
|
||||
|
||||
# this is for lemmy's version of Markdown (can be removed in future - when HTML from them filtered through an allow_list is used, instead of MD)
|
||||
def lemmy_markdown_to_html(markdown_text) -> str:
|
||||
if markdown_text:
|
||||
raw_html = markdown2.markdown(markdown_text, safe_mode=True, extras={'middle-word-em': False, 'tables': True,
|
||||
'fenced-code-blocks': True, 'strike': True,
|
||||
'breaks': {'on_newline': False, 'on_backslash': True}})
|
||||
# replace lemmy spoiler tokens with appropriate html tags instead.
|
||||
re_spoiler = re.compile(r':{3}\s*?spoiler\s+?(\S.+?)(?:\n|</p>)(.+?)(?:\n|<p>):{3}', re.S)
|
||||
raw_html = re_spoiler.sub(r'<details><summary>\1</summary><p>\2</p></details>', raw_html)
|
||||
return allowlist_html(raw_html)
|
||||
else:
|
||||
return ''
|
||||
|
|
Loading…
Reference in a new issue