mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-23 19:36:56 -08:00
Initial support for incoming Microblog posts
This commit is contained in:
parent
1b1b126bf9
commit
484d165f47
2 changed files with 45 additions and 6 deletions
|
@ -25,7 +25,7 @@ import pytesseract
|
|||
from app.utils import get_request, allowlist_html, html_to_markdown, get_setting, ap_datetime, markdown_to_html, \
|
||||
is_image_url, domain_from_url, gibberish, ensure_directory_exists, markdown_to_text, head_request, post_ranking, \
|
||||
shorten_string, reply_already_exists, reply_is_just_link_to_gif_reaction, confidence, remove_tracking_from_link, \
|
||||
blocked_phrases
|
||||
blocked_phrases, microblog_content_to_title
|
||||
|
||||
|
||||
def public_key():
|
||||
|
@ -1295,11 +1295,17 @@ def create_post(activity_log: ActivityPubLog, community: Community, request_json
|
|||
activity_log.exception_message = 'Community is local only, post discarded'
|
||||
activity_log.result = 'ignored'
|
||||
return None
|
||||
if 'name' not in request_json['object']: # Microblog posts sometimes get Announced by lemmy. They don't have a title, so we can't use them.
|
||||
return None
|
||||
nsfl_in_title = '[NSFL]' in request_json['object']['name'].upper() or '(NSFL)' in request_json['object']['name'].upper()
|
||||
if 'name' not in request_json['object']: # Microblog posts
|
||||
if 'content' in request_json['object'] and request_json['object']['content'] is not None:
|
||||
name = "[Microblog]"
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
name = request_json['object']['name']
|
||||
|
||||
nsfl_in_title = '[NSFL]' in name.upper() or '(NSFL)' in name.upper()
|
||||
post = Post(user_id=user.id, community_id=community.id,
|
||||
title=html.unescape(request_json['object']['name']),
|
||||
title=html.unescape(name),
|
||||
comments_enabled=request_json['object']['commentsEnabled'] if 'commentsEnabled' in request_json['object'] else True,
|
||||
sticky=request_json['object']['stickied'] if 'stickied' in request_json['object'] else False,
|
||||
nsfw=request_json['object']['sensitive'] if 'sensitive' in request_json['object'] else False,
|
||||
|
@ -1321,6 +1327,11 @@ def create_post(activity_log: ActivityPubLog, community: Community, request_json
|
|||
elif 'content' in request_json['object'] and request_json['object']['content'] is not None: # Kbin
|
||||
post.body_html = allowlist_html(request_json['object']['content'])
|
||||
post.body = html_to_markdown(post.body_html)
|
||||
if name == "[Microblog]":
|
||||
name += ' ' + microblog_content_to_title(post.body_html)
|
||||
if '[NSFL]' in name.upper() or '(NSFL)' in name.upper():
|
||||
post.nsfl = True
|
||||
post.title = name
|
||||
# Discard post if it contains certain phrases. Good for stopping spam floods.
|
||||
blocked_phrases_list = blocked_phrases()
|
||||
for blocked_phrase in blocked_phrases_list:
|
||||
|
@ -1333,7 +1344,10 @@ def create_post(activity_log: ActivityPubLog, community: Community, request_json
|
|||
if 'attachment' in request_json['object'] and len(request_json['object']['attachment']) > 0 and \
|
||||
'type' in request_json['object']['attachment'][0]:
|
||||
if request_json['object']['attachment'][0]['type'] == 'Link':
|
||||
post.url = request_json['object']['attachment'][0]['href']
|
||||
post.url = request_json['object']['attachment'][0]['href'] # Lemmy
|
||||
if request_json['object']['attachment'][0]['type'] == 'Document':
|
||||
post.url = request_json['object']['attachment'][0]['url'] # Mastodon
|
||||
if post.url:
|
||||
if is_image_url(post.url):
|
||||
post.type = POST_TYPE_IMAGE
|
||||
if 'image' in request_json['object'] and 'url' in request_json['object']['image']:
|
||||
|
|
25
app/utils.py
25
app/utils.py
|
@ -262,6 +262,31 @@ def markdown_to_text(markdown_text) -> str:
|
|||
return markdown_text.replace("# ", '')
|
||||
|
||||
|
||||
def microblog_content_to_title(html: str) -> str:
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
title_found = False
|
||||
for tag in soup.find_all():
|
||||
if tag.name == 'p':
|
||||
if not title_found:
|
||||
title_found = True
|
||||
continue
|
||||
else:
|
||||
tag = tag.extract()
|
||||
|
||||
if title_found:
|
||||
result = soup.text
|
||||
if len(result) > 150:
|
||||
for i in range(149, -1, -1):
|
||||
if result[i] == ' ':
|
||||
break;
|
||||
result = result[:i] + ' ...' if i > 0 else ''
|
||||
else:
|
||||
result = ''
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def domain_from_url(url: str, create=True) -> Domain:
|
||||
parsed_url = urlparse(url.lower().replace('www.', ''))
|
||||
if parsed_url and parsed_url.hostname:
|
||||
|
|
Loading…
Reference in a new issue