mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-23 11:26:56 -08:00
1374 lines
52 KiB
Python
1374 lines
52 KiB
Python
from __future__ import annotations
|
|
|
|
import bisect
|
|
import hashlib
|
|
import mimetypes
|
|
import random
|
|
import urllib
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, date
|
|
from time import sleep
|
|
from typing import List, Literal, Union
|
|
|
|
import httpx
|
|
import markdown2
|
|
from urllib.parse import urlparse, parse_qs, urlencode
|
|
from functools import wraps
|
|
import flask
|
|
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
|
|
import warnings
|
|
import jwt
|
|
|
|
from app.constants import DOWNVOTE_ACCEPT_ALL, DOWNVOTE_ACCEPT_TRUSTED, DOWNVOTE_ACCEPT_INSTANCE, \
|
|
DOWNVOTE_ACCEPT_MEMBERS
|
|
|
|
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
|
|
import os
|
|
from furl import furl
|
|
from flask import current_app, json, redirect, url_for, request, make_response, Response, g, flash
|
|
from flask_babel import _
|
|
from flask_login import current_user, logout_user
|
|
from sqlalchemy import text, or_
|
|
from sqlalchemy.orm import Session
|
|
from wtforms.fields import SelectField, SelectMultipleField
|
|
from wtforms.widgets import Select, html_params, ListWidget, CheckboxInput
|
|
from app import db, cache, httpx_client, celery
|
|
import re
|
|
from PIL import Image, ImageOps
|
|
|
|
from app.models import Settings, Domain, Instance, BannedInstances, User, Community, DomainBlock, ActivityPubLog, IpBan, \
|
|
Site, Post, PostReply, utcnow, Filter, CommunityMember, InstanceBlock, CommunityBan, Topic, UserBlock, Language, \
|
|
File, ModLog, CommunityBlock
|
|
|
|
|
|
# Flask's render_template function, with support for themes added
|
|
def render_template(template_name: str, **context) -> Response:
|
|
theme = current_theme()
|
|
if theme != '' and os.path.exists(f'app/templates/themes/{theme}/{template_name}'):
|
|
content = flask.render_template(f'themes/{theme}/{template_name}', **context)
|
|
else:
|
|
content = flask.render_template(template_name, **context)
|
|
|
|
# Browser caching using ETags and Cache-Control
|
|
resp = make_response(content)
|
|
if current_user.is_anonymous:
|
|
if 'etag' in context:
|
|
resp.headers.add_header('ETag', context['etag'])
|
|
resp.headers.add_header('Cache-Control', 'no-cache, max-age=600, must-revalidate')
|
|
return resp
|
|
|
|
|
|
def request_etag_matches(etag):
|
|
if 'If-None-Match' in request.headers:
|
|
old_etag = request.headers['If-None-Match']
|
|
return old_etag == etag
|
|
return False
|
|
|
|
|
|
def return_304(etag, content_type=None):
|
|
resp = make_response('', 304)
|
|
resp.headers.add_header('ETag', request.headers['If-None-Match'])
|
|
resp.headers.add_header('Cache-Control', 'no-cache, max-age=600, must-revalidate')
|
|
resp.headers.add_header('Vary', 'Accept, Cookie, Accept-Language')
|
|
if content_type:
|
|
resp.headers.set('Content-Type', content_type)
|
|
return resp
|
|
|
|
|
|
# Jinja: when a file was modified. Useful for cache-busting
|
|
def getmtime(filename):
|
|
if os.path.exists('static/' + filename):
|
|
return os.path.getmtime('static/' + filename)
|
|
|
|
|
|
# do a GET request to a uri, return the result
|
|
def get_request(uri, params=None, headers=None) -> httpx.Response:
|
|
timeout = 15 if 'washingtonpost.com' in uri else 5 # Washington Post is really slow on og:image for some reason
|
|
if headers is None:
|
|
headers = {'User-Agent': f'PieFed/1.0; +https://{current_app.config["SERVER_NAME"]}'}
|
|
else:
|
|
headers.update({'User-Agent': f'PieFed/1.0; +https://{current_app.config["SERVER_NAME"]}'})
|
|
if params and '/webfinger' in uri:
|
|
payload_str = urllib.parse.urlencode(params, safe=':@')
|
|
else:
|
|
payload_str = urllib.parse.urlencode(params) if params else None
|
|
try:
|
|
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout, follow_redirects=True)
|
|
except ValueError as ex:
|
|
# Convert to a more generic error we handle
|
|
raise httpx.HTTPError(f"HTTPError: {str(ex)}") from None
|
|
except httpx.ReadError as connection_error:
|
|
try: # retry, this time with a longer timeout
|
|
sleep(random.randint(3, 10))
|
|
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout * 2, follow_redirects=True)
|
|
except Exception as e:
|
|
current_app.logger.info(f"{uri} {connection_error}")
|
|
raise httpx_client.ReadError(f"HTTPReadError: {str(e)}") from connection_error
|
|
except httpx.HTTPError as read_timeout:
|
|
try: # retry, this time with a longer timeout
|
|
sleep(random.randint(3, 10))
|
|
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout * 2, follow_redirects=True)
|
|
except Exception as e:
|
|
current_app.logger.info(f"{uri} {read_timeout}")
|
|
raise httpx.HTTPError(f"HTTPError: {str(e)}") from read_timeout
|
|
|
|
return response
|
|
|
|
|
|
# Same as get_request except updates instance on failure and does not raise any exceptions
|
|
def get_request_instance(uri, instance: Instance, params=None, headers=None) -> httpx.Response:
|
|
try:
|
|
return get_request(uri, params, headers)
|
|
except:
|
|
instance.failures += 1
|
|
instance.update_dormant_gone()
|
|
db.session.commit()
|
|
return httpx.Response(status_code=500)
|
|
|
|
|
|
# do a HEAD request to a uri, return the result
|
|
def head_request(uri, params=None, headers=None) -> httpx.Response:
|
|
if headers is None:
|
|
headers = {'User-Agent': f'PieFed/1.0; +https://{current_app.config["SERVER_NAME"]}'}
|
|
else:
|
|
headers.update({'User-Agent': f'PieFed/1.0; +https://{current_app.config["SERVER_NAME"]}'})
|
|
try:
|
|
response = httpx_client.head(uri, params=params, headers=headers, timeout=5, allow_redirects=True)
|
|
except httpx.HTTPError as er:
|
|
current_app.logger.info(f"{uri} {er}")
|
|
raise httpx.HTTPError(f"HTTPError: {str(er)}") from er
|
|
|
|
return response
|
|
|
|
|
|
# Saves an arbitrary object into a persistent key-value store. cached.
|
|
# Similar to g.site.* except g.site.* is populated on every single page load so g.site is best for settings that are
|
|
# accessed very often (e.g. every page load)
|
|
@cache.memoize(timeout=50)
|
|
def get_setting(name: str, default=None):
|
|
setting = Settings.query.filter_by(name=name).first()
|
|
if setting is None:
|
|
return default
|
|
else:
|
|
return json.loads(setting.value)
|
|
|
|
|
|
# retrieves arbitrary object from persistent key-value store
|
|
def set_setting(name: str, value):
|
|
setting = Settings.query.filter_by(name=name).first()
|
|
if setting is None:
|
|
db.session.add(Settings(name=name, value=json.dumps(value)))
|
|
else:
|
|
setting.value = json.dumps(value)
|
|
db.session.commit()
|
|
cache.delete_memoized(get_setting)
|
|
|
|
|
|
# Return the contents of a file as a string. Inspired by PHP's function of the same name.
|
|
def file_get_contents(filename):
|
|
with open(filename, 'r') as file:
|
|
contents = file.read()
|
|
return contents
|
|
|
|
|
|
random_chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
|
|
|
|
|
def gibberish(length: int = 10) -> str:
|
|
return "".join([random.choice(random_chars) for x in range(length)])
|
|
|
|
|
|
# used by @cache.cached() for home page and post caching
|
|
def make_cache_key(sort=None, post_id=None, view_filter=None):
|
|
if current_user.is_anonymous:
|
|
return f'{request.url}_{sort}_{post_id}_anon_{request.headers.get("Accept")}_{request.headers.get("Accept-Language")}' # The Accept header differentiates between activitypub requests and everything else
|
|
else:
|
|
return f'{request.url}_{sort}_{post_id}_user_{current_user.id}'
|
|
|
|
|
|
def is_image_url(url):
|
|
common_image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.avif', '.svg+xml', '.svg+xml; charset=utf-8']
|
|
mime_type = mime_type_using_head(url)
|
|
if mime_type:
|
|
mime_type_parts = mime_type.split('/')
|
|
return f'.{mime_type_parts[1]}' in common_image_extensions
|
|
else:
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path.lower()
|
|
return any(path.endswith(extension) for extension in common_image_extensions)
|
|
|
|
|
|
def is_local_image_url(url):
|
|
if not is_image_url(url):
|
|
return False
|
|
f = furl(url)
|
|
return f.host in ["127.0.0.1", current_app.config["SERVER_NAME"]]
|
|
|
|
|
|
def is_video_url(url: str) -> bool:
|
|
common_video_extensions = ['.mp4', '.webm']
|
|
mime_type = mime_type_using_head(url)
|
|
if mime_type:
|
|
mime_type_parts = mime_type.split('/')
|
|
return f'.{mime_type_parts[1]}' in common_video_extensions
|
|
else:
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path.lower()
|
|
return any(path.endswith(extension) for extension in common_video_extensions)
|
|
|
|
|
|
def is_video_hosting_site(url: str) -> bool:
|
|
if url is None or url == '':
|
|
return False
|
|
video_hosting_sites = ['https://youtube.com', 'https://www.youtube.com', 'https://youtu.be', 'https://www.vimeo.com', 'https://www.redgifs.com/watch/']
|
|
for starts_with in video_hosting_sites:
|
|
if url.startswith(starts_with):
|
|
return True
|
|
|
|
if 'videos/watch' in url: # PeerTube
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
@cache.memoize(timeout=10)
|
|
def mime_type_using_head(url):
|
|
# Find the mime type of a url by doing a HEAD request - this is the same as GET except only the HTTP headers are transferred
|
|
try:
|
|
response = httpx_client.head(url, timeout=5)
|
|
response.raise_for_status() # Raise an exception for HTTP errors
|
|
content_type = response.headers.get('Content-Type')
|
|
if content_type:
|
|
if content_type == 'application/octet-stream':
|
|
return ''
|
|
return content_type
|
|
else:
|
|
return ''
|
|
except httpx.HTTPError as e:
|
|
return ''
|
|
|
|
|
|
# sanitise HTML using an allow list
|
|
def allowlist_html(html: str, a_target='_blank') -> str:
|
|
if html is None or html == '':
|
|
return ''
|
|
allowed_tags = ['p', 'strong', 'a', 'ul', 'ol', 'li', 'em', 'blockquote', 'cite', 'br', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'pre',
|
|
'code', 'img', 'details', 'summary', 'table', 'tr', 'td', 'th', 'tbody', 'thead', 'hr', 'span', 'small', 'sub', 'sup',
|
|
's']
|
|
# Parse the HTML using BeautifulSoup
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Find all plain text links, convert to <a> tags
|
|
re_url = re.compile(r'(http[s]?://[!-~]+)') # http(s):// followed by chars in ASCII range 33 to 126
|
|
for tag in soup.find_all(text=True):
|
|
tags = []
|
|
url = False
|
|
for t in re_url.split(tag.string):
|
|
if re_url.match(t):
|
|
# Avoid picking up trailing punctuation for raw URLs in text
|
|
href = t[:-1] if t[-1] in ['.', ',', ')', '!', ':', ';', '?'] else t
|
|
a = soup.new_tag("a", href=href)
|
|
a.string = href
|
|
tags.append(a)
|
|
if href != t:
|
|
tags.append(t[-1])
|
|
url = True
|
|
else:
|
|
tags.append(t)
|
|
if url:
|
|
for t in tags:
|
|
tag.insert_before(t)
|
|
tag.extract()
|
|
|
|
# Filter tags, leaving only safe ones
|
|
for tag in soup.find_all():
|
|
# If the tag is not in the allowed_tags list, remove it and its contents
|
|
if tag.name not in allowed_tags:
|
|
tag.extract()
|
|
else:
|
|
# Filter and sanitize attributes
|
|
for attr in list(tag.attrs):
|
|
if attr not in ['href', 'src', 'alt', 'class']:
|
|
del tag[attr]
|
|
# Remove some mastodon guff - spans with class "invisible"
|
|
if tag.name == 'span' and 'class' in tag.attrs and 'invisible' in tag.attrs['class']:
|
|
tag.extract()
|
|
# Add nofollow and target=_blank to anchors
|
|
if tag.name == 'a':
|
|
tag.attrs['rel'] = 'nofollow ugc'
|
|
tag.attrs['target'] = a_target
|
|
# Add loading=lazy to images
|
|
if tag.name == 'img':
|
|
tag.attrs['loading'] = 'lazy'
|
|
if tag.name == 'table':
|
|
tag.attrs['class'] = 'table'
|
|
|
|
clean_html = str(soup)
|
|
|
|
# avoid wrapping anchors around existing anchors (e.g. if raw URL already wrapped by remote PieFed instance)
|
|
re_double_anchor = re.compile(r'<a href=".*?" rel="nofollow ugc" target="_blank">(<a href=".*?" rel="nofollow ugc" target="_blank">.*?<\/a>)<\/a>')
|
|
clean_html = re_double_anchor.sub(r'\1', clean_html)
|
|
|
|
# avoid returning empty anchors
|
|
re_empty_anchor = re.compile(r'<a href="(.*?)" rel="nofollow ugc" target="_blank"><\/a>')
|
|
clean_html = re_empty_anchor.sub(r'<a href="\1" rel="nofollow ugc" target="_blank">\1</a>', clean_html)
|
|
|
|
# replace lemmy's spoiler markdown left in HTML
|
|
re_spoiler = re.compile(r':{3}\s*?spoiler\s+?(\S.+?)(?:\n|</p>)(.+?)(?:\n|<p>):{3}', re.S)
|
|
clean_html = re_spoiler.sub(r'<details><summary>\1</summary><p>\2</p></details>', clean_html)
|
|
|
|
# replace strikethough markdown left in HTML
|
|
re_strikethough = re.compile(r'~~(.*)~~')
|
|
clean_html = re_strikethough.sub(r'<s>\1</s>', clean_html)
|
|
|
|
# replace subscript markdown left in HTML
|
|
re_subscript = re.compile(r'~(\S+)~')
|
|
clean_html = re_subscript.sub(r'<sub>\1</sub>', clean_html)
|
|
|
|
# replace superscript markdown left in HTML
|
|
re_superscript = re.compile(r'\^(\S+)\^')
|
|
clean_html = re_superscript.sub(r'<sup>\1</sup>', clean_html)
|
|
|
|
# replace <img src> for mp4 with <video> - treat them like a GIF (autoplay, but initially muted)
|
|
re_embedded_mp4 = re.compile(r'<img .*?src="(https://.*?\.mp4)".*?/>')
|
|
clean_html = re_embedded_mp4.sub(r'<video class="responsive-video" controls preload="auto" autoplay muted loop playsinline disablepictureinpicture><source src="\1" type="video/mp4"></video>', clean_html)
|
|
|
|
# replace <img src> for webm with <video> - treat them like a GIF (autoplay, but initially muted)
|
|
re_embedded_webm = re.compile(r'<img .*?src="(https://.*?\.webm)".*?/>')
|
|
clean_html = re_embedded_webm.sub(r'<video class="responsive-video" controls preload="auto" autoplay muted loop playsinline disablepictureinpicture><source src="\1" type="video/webm"></video>', clean_html)
|
|
|
|
# replace <img src> for mp3 with <audio>
|
|
re_embedded_mp3 = re.compile(r'<img .*?src="(https://.*?\.mp3)".*?/>')
|
|
clean_html = re_embedded_mp3.sub(r'<audio controls><source src="\1" type="audio/mp3"></audio>', clean_html)
|
|
|
|
# replace the 'static' for images hotlinked to fandom sites with 'vignette'
|
|
re_fandom_hotlink = re.compile(r'<img alt="(.*?)" loading="lazy" src="https://static.wikia.nocookie.net')
|
|
clean_html = re_fandom_hotlink.sub(r'<img alt="\1" loading="lazy" src="https://vignette.wikia.nocookie.net', clean_html)
|
|
|
|
return clean_html
|
|
|
|
|
|
# use this for Markdown irrespective of origin, as it can deal with both soft break newlines ('\n' used by PieFed) and hard break newlines (' \n' or ' \\n')
|
|
# ' \\n' will create <br /><br /> instead of just <br />, but hopefully that's acceptable.
|
|
def markdown_to_html(markdown_text, anchors_new_tab=True) -> str:
|
|
if markdown_text:
|
|
raw_html = markdown2.markdown(markdown_text,
|
|
extras={'middle-word-em': False, 'tables': True, 'fenced-code-blocks': True, 'strike': True, 'breaks': {'on_newline': True, 'on_backslash': True}})
|
|
return allowlist_html(raw_html, a_target='_blank' if anchors_new_tab else '')
|
|
else:
|
|
return ''
|
|
|
|
|
|
# this function lets local users use the more intuitive soft-breaks for newlines, but actually stores the Markdown in Lemmy-compatible format
|
|
# Reasons for this:
|
|
# 1. it's what any adapted Lemmy apps using an API would expect
|
|
# 2. we've reverted to sending out Markdown in 'source' because:
|
|
# a. Lemmy doesn't convert '<details><summary>' back into its '::: spoiler' format
|
|
# b. anything coming from another PieFed instance would get reduced with html_to_text()
|
|
# c. raw 'https' strings in code blocks are being converted into <a> links for HTML that Lemmy then converts back into []()
|
|
def piefed_markdown_to_lemmy_markdown(piefed_markdown: str):
|
|
# only difference is newlines for soft breaks.
|
|
re_breaks = re.compile(r'(\S)(\r\n)')
|
|
lemmy_markdown = re_breaks.sub(r'\1 \2', piefed_markdown)
|
|
return lemmy_markdown
|
|
|
|
|
|
def markdown_to_text(markdown_text) -> str:
|
|
if not markdown_text or markdown_text == '':
|
|
return ''
|
|
return markdown_text.replace("# ", '')
|
|
|
|
|
|
def html_to_text(html) -> str:
|
|
if html is None or html == '':
|
|
return ''
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
return soup.get_text()
|
|
|
|
|
|
def mastodon_extra_field_link(extra_field: str) -> str:
|
|
soup = BeautifulSoup(extra_field, 'html.parser')
|
|
for tag in soup.find_all('a'):
|
|
return tag['href']
|
|
|
|
|
|
def microblog_content_to_title(html: str) -> str:
|
|
title = ''
|
|
if '<p>' in html:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
for tag in soup.find_all('p'):
|
|
title = tag.get_text(separator=" ")
|
|
if title and title.strip() != '' and len(title.strip()) >= 5:
|
|
break
|
|
else:
|
|
title = html_to_text(html)
|
|
|
|
period_index = title.find('.')
|
|
question_index = title.find('?')
|
|
exclamation_index = title.find('!')
|
|
|
|
# Find the earliest occurrence of either '.' or '?' or '!'
|
|
end_index = min(period_index if period_index != -1 else float('inf'),
|
|
question_index if question_index != -1 else float('inf'),
|
|
exclamation_index if exclamation_index != -1 else float('inf'))
|
|
|
|
# there's no recognised punctuation
|
|
if end_index == float('inf'):
|
|
if len(title) >= 10:
|
|
title = title.replace(' @ ', '').replace(' # ', '')
|
|
title = shorten_string(title, 197)
|
|
else:
|
|
title = '(content in post body)'
|
|
return title.strip()
|
|
|
|
if end_index != -1:
|
|
if question_index != -1 and question_index == end_index:
|
|
end_index += 1 # Add the ? back on
|
|
if exclamation_index != -1 and exclamation_index == end_index:
|
|
end_index += 1 # Add the ! back on
|
|
title = title[:end_index]
|
|
|
|
if len(title) > 150:
|
|
for i in range(149, -1, -1):
|
|
if title[i] == ' ':
|
|
break
|
|
title = title[:i] + ' ...' if i > 0 else ''
|
|
|
|
return title.strip()
|
|
|
|
|
|
def first_paragraph(html):
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
first_para = soup.find('p')
|
|
if first_para:
|
|
if first_para.text.strip() == 'Summary' or \
|
|
first_para.text.strip() == '*Summary*' or \
|
|
first_para.text.strip() == 'Comments' or \
|
|
first_para.text.lower().startswith('cross-posted from:'):
|
|
second_paragraph = first_para.find_next('p')
|
|
if second_paragraph:
|
|
return f'<p>{second_paragraph.text}</p>'
|
|
return f'<p>{first_para.text}</p>'
|
|
else:
|
|
return ''
|
|
|
|
def community_link_to_href(link: str) -> str:
|
|
pattern = r"!([a-zA-Z0-9_.-]*)@([a-zA-Z0-9_.-]*)\b"
|
|
server = r'<a href=https://' + current_app.config['SERVER_NAME'] + r'/community/lookup/'
|
|
return re.sub(pattern, server + r'\g<1>/\g<2>>' + r'!\g<1>@\g<2></a>', link)
|
|
|
|
|
|
def person_link_to_href(link: str) -> str:
|
|
pattern = r"@([a-zA-Z0-9_.-]*)@([a-zA-Z0-9_.-]*)\b"
|
|
server = f'https://{current_app.config["SERVER_NAME"]}/user/lookup/'
|
|
replacement = (r'<a href="' + server + r'\g<1>/\g<2>" rel="nofollow noindex">@\g<1>@\g<2></a>')
|
|
return re.sub(pattern, replacement, link)
|
|
|
|
|
|
def domain_from_url(url: str, create=True) -> Domain:
|
|
parsed_url = urlparse(url.lower().replace('www.', ''))
|
|
if parsed_url and parsed_url.hostname:
|
|
find_this = parsed_url.hostname.lower()
|
|
if find_this == 'youtu.be':
|
|
find_this = 'youtube.com'
|
|
domain = Domain.query.filter_by(name=find_this).first()
|
|
if create and domain is None:
|
|
domain = Domain(name=find_this)
|
|
db.session.add(domain)
|
|
db.session.commit()
|
|
return domain
|
|
else:
|
|
return None
|
|
|
|
|
|
def shorten_string(input_str, max_length=50):
|
|
if input_str:
|
|
if len(input_str) <= max_length:
|
|
return input_str
|
|
else:
|
|
return input_str[:max_length - 3] + '…'
|
|
else:
|
|
return ''
|
|
|
|
|
|
def shorten_url(input: str, max_length=20):
|
|
if input:
|
|
return shorten_string(input.replace('https://', '').replace('http://', ''))
|
|
else:
|
|
''
|
|
|
|
|
|
# the number of digits in a number. e.g. 1000 would be 4
|
|
def digits(input: int) -> int:
|
|
return len(shorten_number(input))
|
|
|
|
|
|
@cache.memoize(timeout=50)
|
|
def user_access(permission: str, user_id: int) -> bool:
|
|
has_access = db.session.execute(text('SELECT * FROM "role_permission" as rp ' +
|
|
'INNER JOIN user_role ur on rp.role_id = ur.role_id ' +
|
|
'WHERE ur.user_id = :user_id AND rp.permission = :permission'),
|
|
{'user_id': user_id, 'permission': permission}).first()
|
|
return has_access is not None
|
|
|
|
|
|
def role_access(permission: str, role_id: int) -> bool:
|
|
has_access = db.session.execute(text('SELECT * FROM "role_permission" as rp ' +
|
|
'WHERE rp.role_id = :role_id AND rp.permission = :permission'),
|
|
{'role_id': role_id, 'permission': permission}).first()
|
|
return has_access is not None
|
|
|
|
|
|
@cache.memoize(timeout=10)
|
|
def community_membership(user: User, community: Community) -> int:
|
|
if community is None:
|
|
return False
|
|
return user.subscribed(community.id)
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def communities_banned_from(user_id: int) -> List[int]:
|
|
community_bans = CommunityBan.query.filter(CommunityBan.user_id == user_id).all()
|
|
return [cb.community_id for cb in community_bans]
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_domains(user_id) -> List[int]:
|
|
blocks = DomainBlock.query.filter_by(user_id=user_id)
|
|
return [block.domain_id for block in blocks]
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_communities(user_id) -> List[int]:
|
|
blocks = CommunityBlock.query.filter_by(user_id=user_id)
|
|
return [block.community_id for block in blocks]
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_instances(user_id) -> List[int]:
|
|
blocks = InstanceBlock.query.filter_by(user_id=user_id)
|
|
return [block.instance_id for block in blocks]
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_users(user_id) -> List[int]:
|
|
blocks = UserBlock.query.filter_by(blocker_id=user_id)
|
|
return [block.blocked_id for block in blocks]
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_phrases() -> List[str]:
|
|
site = Site.query.get(1)
|
|
if site.blocked_phrases:
|
|
blocked_phrases = []
|
|
for phrase in site.blocked_phrases.split('\n'):
|
|
if phrase != '':
|
|
if phrase.endswith('\r'):
|
|
blocked_phrases.append(phrase[:-1])
|
|
else:
|
|
blocked_phrases.append(phrase)
|
|
return blocked_phrases
|
|
else:
|
|
return []
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def blocked_referrers() -> List[str]:
|
|
site = Site.query.get(1)
|
|
if site.auto_decline_referrers:
|
|
return [referrer for referrer in site.auto_decline_referrers.split('\n') if referrer != '']
|
|
else:
|
|
return []
|
|
|
|
def retrieve_block_list():
|
|
try:
|
|
response = httpx_client.get('https://raw.githubusercontent.com/rimu/no-qanon/master/domains.txt', timeout=1)
|
|
except:
|
|
return None
|
|
if response and response.status_code == 200:
|
|
return response.text
|
|
|
|
|
|
def retrieve_peertube_block_list():
|
|
try:
|
|
response = httpx_client.get('https://peertube_isolation.frama.io/list/peertube_isolation.json', timeout=1)
|
|
except:
|
|
return None
|
|
list = ''
|
|
if response and response.status_code == 200:
|
|
response_data = response.json()
|
|
for row in response_data['data']:
|
|
list += row['value'] + "\n"
|
|
response.close()
|
|
return list.strip()
|
|
|
|
|
|
def ensure_directory_exists(directory):
|
|
parts = directory.split('/')
|
|
rebuild_directory = ''
|
|
for part in parts:
|
|
rebuild_directory += part
|
|
if not os.path.isdir(rebuild_directory):
|
|
os.mkdir(rebuild_directory)
|
|
rebuild_directory += '/'
|
|
|
|
|
|
def mimetype_from_url(url):
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path.split('?')[0] # Strip off anything after '?'
|
|
mime_type, _ = mimetypes.guess_type(path)
|
|
return mime_type
|
|
|
|
|
|
def validation_required(func):
|
|
@wraps(func)
|
|
def decorated_view(*args, **kwargs):
|
|
if current_user.verified:
|
|
return func(*args, **kwargs)
|
|
else:
|
|
return redirect(url_for('auth.validation_required'))
|
|
return decorated_view
|
|
|
|
|
|
def permission_required(permission):
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def decorated_view(*args, **kwargs):
|
|
if user_access(permission, current_user.id):
|
|
return func(*args, **kwargs)
|
|
else:
|
|
# Handle the case where the user doesn't have the required permission
|
|
return redirect(url_for('auth.permission_denied'))
|
|
|
|
return decorated_view
|
|
|
|
return decorator
|
|
|
|
|
|
# sends the user back to where they came from
|
|
def back(default_url):
|
|
# Get the referrer from the request headers
|
|
referrer = request.referrer
|
|
|
|
# If the referrer exists and is not the same as the current request URL, redirect to the referrer
|
|
if referrer and referrer != request.url:
|
|
return redirect(referrer)
|
|
|
|
# If referrer is not available or is the same as the current request URL, redirect to the default URL
|
|
return redirect(default_url)
|
|
|
|
|
|
# format a datetime in a way that is used in ActivityPub
|
|
def ap_datetime(date_time: datetime) -> str:
|
|
return date_time.isoformat() + '+00:00'
|
|
|
|
|
|
class MultiCheckboxField(SelectMultipleField):
|
|
widget = ListWidget(prefix_label=False)
|
|
option_widget = CheckboxInput()
|
|
|
|
|
|
def ip_address() -> str:
|
|
ip = request.headers.get('X-Forwarded-For') or request.remote_addr
|
|
if ',' in ip: # Remove all but first ip addresses
|
|
ip = ip[:ip.index(',')].strip()
|
|
return ip
|
|
|
|
|
|
def user_ip_banned() -> bool:
|
|
current_ip_address = ip_address()
|
|
if current_ip_address:
|
|
return current_ip_address in banned_ip_addresses()
|
|
|
|
|
|
@cache.memoize(timeout=150)
|
|
def instance_banned(domain: str) -> bool: # see also activitypub.util.instance_blocked()
|
|
if domain is None or domain == '':
|
|
return False
|
|
domain = domain.lower().strip()
|
|
if 'https://' in domain or 'http://' in domain:
|
|
domain = urlparse(domain).hostname
|
|
banned = BannedInstances.query.filter_by(domain=domain).first()
|
|
if banned is not None:
|
|
return True
|
|
|
|
# Mastodon sometimes bans with a * in the domain name, meaning "any letter", e.g. "cum.**mp"
|
|
regex_patterns = [re.compile(f"^{cond.domain.replace('*', '[a-zA-Z0-9]')}$") for cond in
|
|
BannedInstances.query.filter(BannedInstances.domain.like('%*%')).all()]
|
|
return any(pattern.match(domain) for pattern in regex_patterns)
|
|
|
|
|
|
def user_cookie_banned() -> bool:
|
|
cookie = request.cookies.get('sesion', None)
|
|
return cookie is not None
|
|
|
|
|
|
@cache.memoize(timeout=30)
|
|
def banned_ip_addresses() -> List[str]:
|
|
ips = IpBan.query.all()
|
|
return [ip.ip_address for ip in ips]
|
|
|
|
|
|
def can_downvote(user, community: Community, site=None) -> bool:
|
|
if user is None or community is None or user.banned or user.bot:
|
|
return False
|
|
|
|
if site is None:
|
|
try:
|
|
site = g.site
|
|
except:
|
|
site = Site.query.get(1)
|
|
|
|
if not site.enable_downvotes:
|
|
return False
|
|
|
|
if community.local_only and not user.is_local():
|
|
return False
|
|
|
|
if (user.attitude and user.attitude < -0.40) or user.reputation < -10: # this should exclude about 3.7% of users.
|
|
return False
|
|
|
|
if community.downvote_accept_mode != DOWNVOTE_ACCEPT_ALL:
|
|
if community.downvote_accept_mode == DOWNVOTE_ACCEPT_MEMBERS:
|
|
if not community.is_member(user):
|
|
return False
|
|
elif community.downvote_accept_mode == DOWNVOTE_ACCEPT_INSTANCE:
|
|
if user.instance_id != community.instance_id:
|
|
return False
|
|
elif community.downvote_accept_mode == DOWNVOTE_ACCEPT_TRUSTED:
|
|
if community.instance_id == user.instance_id:
|
|
pass
|
|
else:
|
|
if user.instance_id not in trusted_instance_ids():
|
|
return False
|
|
|
|
if community.id in communities_banned_from(user.id):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def can_upvote(user, community: Community) -> bool:
|
|
if user is None or community is None or user.banned or user.bot:
|
|
return False
|
|
|
|
if community.id in communities_banned_from(user.id):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def can_create_post(user, content: Community) -> bool:
|
|
|
|
if content is None:
|
|
return False
|
|
|
|
if user is None or content is None or user.banned:
|
|
return False
|
|
|
|
if user.ban_posts:
|
|
return False
|
|
|
|
if content.is_moderator(user) or user.is_admin():
|
|
return True
|
|
|
|
if content.restricted_to_mods:
|
|
return False
|
|
|
|
if content.local_only and not user.is_local():
|
|
return False
|
|
|
|
if content.id in communities_banned_from(user.id):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def can_create_post_reply(user, content: Community) -> bool:
|
|
if user is None or content is None or user.banned:
|
|
return False
|
|
|
|
if user.ban_comments:
|
|
return False
|
|
|
|
if content.is_moderator(user) or user.is_admin():
|
|
return True
|
|
|
|
if content.local_only and not user.is_local():
|
|
return False
|
|
|
|
if content.id in communities_banned_from(user.id):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def reply_already_exists(user_id, post_id, parent_id, body) -> bool:
|
|
if parent_id is None:
|
|
num_matching_replies = db.session.execute(text(
|
|
'SELECT COUNT(id) as c FROM "post_reply" WHERE deleted is false and user_id = :user_id AND post_id = :post_id AND parent_id is null AND body = :body'),
|
|
{'user_id': user_id, 'post_id': post_id, 'body': body}).scalar()
|
|
else:
|
|
num_matching_replies = db.session.execute(text(
|
|
'SELECT COUNT(id) as c FROM "post_reply" WHERE deleted is false and user_id = :user_id AND post_id = :post_id AND parent_id = :parent_id AND body = :body'),
|
|
{'user_id': user_id, 'post_id': post_id, 'parent_id': parent_id, 'body': body}).scalar()
|
|
return num_matching_replies != 0
|
|
|
|
|
|
def reply_is_just_link_to_gif_reaction(body) -> bool:
|
|
tmp_body = body.strip()
|
|
if tmp_body.startswith('https://media.tenor.com/') or \
|
|
tmp_body.startswith('https://media1.tenor.com/') or \
|
|
tmp_body.startswith('https://media2.tenor.com/') or \
|
|
tmp_body.startswith('https://media3.tenor.com/') or \
|
|
tmp_body.startswith('https://i.giphy.com/') or \
|
|
tmp_body.startswith('https://i.imgflip.com') or \
|
|
tmp_body.startswith('https://media1.giphy.com/') or \
|
|
tmp_body.startswith('https://media2.giphy.com/') or \
|
|
tmp_body.startswith('https://media3.giphy.com/') or \
|
|
tmp_body.startswith('https://media4.giphy.com/'):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def reply_is_stupid(body) -> bool:
|
|
lower_body = body.lower().strip()
|
|
if lower_body == 'this' or lower_body == 'this.' or lower_body == 'this!':
|
|
return True
|
|
return False
|
|
|
|
|
|
@cache.memoize(timeout=10)
|
|
def trusted_instance_ids() -> List[int]:
|
|
return [instance.id for instance in Instance.query.filter(Instance.trusted == True)]
|
|
|
|
|
|
def inbox_domain(inbox: str) -> str:
|
|
inbox = inbox.lower()
|
|
if 'https://' in inbox or 'http://' in inbox:
|
|
inbox = urlparse(inbox).hostname
|
|
return inbox
|
|
|
|
|
|
def awaken_dormant_instance(instance):
|
|
if instance and not instance.gone_forever:
|
|
if instance.dormant:
|
|
if instance.start_trying_again is None:
|
|
instance.start_trying_again = utcnow() + timedelta(seconds=instance.failures ** 4)
|
|
db.session.commit()
|
|
else:
|
|
if instance.start_trying_again < utcnow():
|
|
instance.dormant = False
|
|
db.session.commit()
|
|
# give up after ~5 days of trying
|
|
if instance.start_trying_again and utcnow() + timedelta(days=5) < instance.start_trying_again:
|
|
instance.gone_forever = True
|
|
instance.dormant = True
|
|
db.session.commit()
|
|
|
|
|
|
def shorten_number(number):
|
|
if number < 1000:
|
|
return str(number)
|
|
elif number < 1000000:
|
|
return f'{number / 1000:.1f}k'
|
|
else:
|
|
return f'{number / 1000000:.1f}M'
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def user_filters_home(user_id):
|
|
filters = Filter.query.filter_by(user_id=user_id, filter_home=True).filter(or_(Filter.expire_after > date.today(), Filter.expire_after == None))
|
|
result = defaultdict(set)
|
|
for filter in filters:
|
|
keywords = [keyword.strip().lower() for keyword in filter.keywords.splitlines()]
|
|
if filter.hide_type == 0:
|
|
result[filter.title].update(keywords)
|
|
else: # type == 1 means hide completely. These posts are excluded from output by the jinja template
|
|
result['-1'].update(keywords)
|
|
return result
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def user_filters_posts(user_id):
|
|
filters = Filter.query.filter_by(user_id=user_id, filter_posts=True).filter(or_(Filter.expire_after > date.today(), Filter.expire_after == None))
|
|
result = defaultdict(set)
|
|
for filter in filters:
|
|
keywords = [keyword.strip().lower() for keyword in filter.keywords.splitlines()]
|
|
if filter.hide_type == 0:
|
|
result[filter.title].update(keywords)
|
|
else:
|
|
result['-1'].update(keywords)
|
|
return result
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def user_filters_replies(user_id):
|
|
filters = Filter.query.filter_by(user_id=user_id, filter_replies=True).filter(or_(Filter.expire_after > date.today(), Filter.expire_after == None))
|
|
result = defaultdict(set)
|
|
for filter in filters:
|
|
keywords = [keyword.strip().lower() for keyword in filter.keywords.splitlines()]
|
|
if filter.hide_type == 0:
|
|
result[filter.title].update(keywords)
|
|
else:
|
|
result['-1'].update(keywords)
|
|
return result
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def moderating_communities(user_id):
|
|
if user_id is None or user_id == 0:
|
|
return []
|
|
return Community.query.join(CommunityMember, Community.id == CommunityMember.community_id).\
|
|
filter(Community.banned == False).\
|
|
filter(or_(CommunityMember.is_moderator == True, CommunityMember.is_owner == True)). \
|
|
filter(CommunityMember.is_banned == False). \
|
|
filter(CommunityMember.user_id == user_id).order_by(Community.title).all()
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def joined_communities(user_id):
|
|
if user_id is None or user_id == 0:
|
|
return []
|
|
return Community.query.join(CommunityMember, Community.id == CommunityMember.community_id).\
|
|
filter(Community.banned == False). \
|
|
filter(CommunityMember.is_moderator == False, CommunityMember.is_owner == False). \
|
|
filter(CommunityMember.is_banned == False). \
|
|
filter(CommunityMember.user_id == user_id).order_by(Community.title).all()
|
|
|
|
|
|
@cache.memoize(timeout=3000)
|
|
def menu_topics():
|
|
return Topic.query.filter(Topic.parent_id == None).order_by(Topic.name).all()
|
|
|
|
|
|
@cache.memoize(timeout=300)
|
|
def community_moderators(community_id):
|
|
return CommunityMember.query.filter((CommunityMember.community_id == community_id) &
|
|
(or_(
|
|
CommunityMember.is_owner,
|
|
CommunityMember.is_moderator
|
|
))
|
|
).all()
|
|
|
|
|
|
def finalize_user_setup(user):
|
|
from app.activitypub.signature import RsaKeys
|
|
user.verified = True
|
|
user.last_seen = utcnow()
|
|
if user.private_key is None and user.public_key is None:
|
|
private_key, public_key = RsaKeys.generate_keypair()
|
|
user.private_key = private_key
|
|
user.public_key = public_key
|
|
user.ap_profile_id = f"https://{current_app.config['SERVER_NAME']}/u/{user.user_name}".lower()
|
|
user.ap_public_url = f"https://{current_app.config['SERVER_NAME']}/u/{user.user_name}"
|
|
user.ap_inbox_url = f"https://{current_app.config['SERVER_NAME']}/u/{user.user_name.lower()}/inbox"
|
|
db.session.commit()
|
|
|
|
|
|
def notification_subscribers(entity_id: int, entity_type: int) -> List[int]:
|
|
return list(db.session.execute(text('SELECT user_id FROM "notification_subscription" WHERE entity_id = :entity_id AND type = :type '),
|
|
{'entity_id': entity_id, 'type': entity_type}).scalars())
|
|
|
|
|
|
# topics, in a tree
|
|
def topic_tree() -> List:
|
|
topics = Topic.query.order_by(Topic.name)
|
|
|
|
topics_dict = {topic.id: {'topic': topic, 'children': []} for topic in topics.all()}
|
|
|
|
for topic in topics:
|
|
if topic.parent_id is not None:
|
|
parent_comment = topics_dict.get(topic.parent_id)
|
|
if parent_comment:
|
|
parent_comment['children'].append(topics_dict[topic.id])
|
|
|
|
return [topic for topic in topics_dict.values() if topic['topic'].parent_id is None]
|
|
|
|
|
|
def opengraph_parse(url):
|
|
if '?' in url:
|
|
url = url.split('?')
|
|
url = url[0]
|
|
try:
|
|
return parse_page(url)
|
|
except Exception as ex:
|
|
return None
|
|
|
|
|
|
def url_to_thumbnail_file(filename) -> File:
|
|
try:
|
|
timeout = 15 if 'washingtonpost.com' in filename else 5 # Washington Post is really slow for some reason
|
|
response = httpx_client.get(filename, timeout=timeout)
|
|
except:
|
|
return None
|
|
if response.status_code == 200:
|
|
content_type = response.headers.get('content-type')
|
|
if content_type and content_type.startswith('image'):
|
|
# Generate file extension from mime type
|
|
content_type_parts = content_type.split('/')
|
|
if content_type_parts:
|
|
file_extension = '.' + content_type_parts[-1]
|
|
if file_extension == '.jpeg':
|
|
file_extension = '.jpg'
|
|
else:
|
|
file_extension = os.path.splitext(filename)[1]
|
|
file_extension = file_extension.replace('%3f', '?') # sometimes urls are not decoded properly
|
|
if '?' in file_extension:
|
|
file_extension = file_extension.split('?')[0]
|
|
|
|
new_filename = gibberish(15)
|
|
directory = 'app/static/media/posts/' + new_filename[0:2] + '/' + new_filename[2:4]
|
|
ensure_directory_exists(directory)
|
|
final_place = os.path.join(directory, new_filename + file_extension)
|
|
with open(final_place, 'wb') as f:
|
|
f.write(response.content)
|
|
response.close()
|
|
Image.MAX_IMAGE_PIXELS = 89478485
|
|
with Image.open(final_place) as img:
|
|
img = ImageOps.exif_transpose(img)
|
|
img.thumbnail((170, 170))
|
|
img.save(final_place)
|
|
thumbnail_width = img.width
|
|
thumbnail_height = img.height
|
|
return File(file_name=new_filename + file_extension, thumbnail_width=thumbnail_width,
|
|
thumbnail_height=thumbnail_height, thumbnail_path=final_place,
|
|
source_url=filename)
|
|
|
|
|
|
# By no means is this a complete list, but it is very easy to search for the ones you need later.
|
|
KNOWN_OPENGRAPH_TAGS = [
|
|
"og:site_name",
|
|
"og:title",
|
|
"og:locale",
|
|
"og:type",
|
|
"og:image",
|
|
"og:url",
|
|
"og:image:url",
|
|
"og:image:secure_url",
|
|
"og:image:type",
|
|
"og:image:width",
|
|
"og:image:height",
|
|
"og:image:alt",
|
|
]
|
|
|
|
|
|
def parse_page(page_url, tags_to_search = KNOWN_OPENGRAPH_TAGS, fallback_tags = None):
|
|
'''
|
|
Parses a page, returns a JSON style dictionary of all OG tags found on that page.
|
|
|
|
Passing in tags_to_search is optional. By default it will search through KNOWN_OPENGRAPH_TAGS constant, but for the sake of efficiency, you may want to only search for 1 or 2 tags
|
|
|
|
Returns False if page is unreadable
|
|
'''
|
|
# read the html from the page
|
|
response = get_request(page_url)
|
|
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
# set up beautiful soup
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# loop through the known list of opengraph tags, searching for each and appending a dictionary as we go.
|
|
found_tags = {}
|
|
|
|
for og_tag in tags_to_search:
|
|
new_found_tag = soup.find("meta", property=og_tag)
|
|
if new_found_tag is not None:
|
|
found_tags[new_found_tag["property"]] = new_found_tag["content"]
|
|
elif fallback_tags is not None and og_tag in fallback_tags:
|
|
found_tags[og_tag] = soup.find(fallback_tags[og_tag]).text
|
|
|
|
return found_tags
|
|
|
|
|
|
def current_theme():
|
|
""" The theme the current user has set, falling back to the site default if none specified or user is not logged in """
|
|
if hasattr(g, 'site'):
|
|
site = g.site
|
|
else:
|
|
site = Site.query.get(1)
|
|
if current_user.is_authenticated:
|
|
if current_user.theme is not None and current_user.theme != '':
|
|
return current_user.theme
|
|
else:
|
|
return site.default_theme if site.default_theme is not None else ''
|
|
else:
|
|
return site.default_theme if site.default_theme is not None else ''
|
|
|
|
|
|
def theme_list():
|
|
""" All the themes available, by looking in the templates/themes directory """
|
|
result = [('', 'PieFed')]
|
|
for root, dirs, files in os.walk('app/templates/themes'):
|
|
for dir in dirs:
|
|
if os.path.exists(f'app/templates/themes/{dir}/{dir}.json'):
|
|
theme_settings = json.loads(file_get_contents(f'app/templates/themes/{dir}/{dir}.json'))
|
|
result.append((dir, theme_settings['name']))
|
|
return result
|
|
|
|
|
|
def sha256_digest(input_string):
|
|
"""
|
|
Compute the SHA-256 hash digest of a given string.
|
|
|
|
Args:
|
|
- input_string: The string to compute the hash digest for.
|
|
|
|
Returns:
|
|
- A hexadecimal string representing the SHA-256 hash digest.
|
|
"""
|
|
sha256_hash = hashlib.sha256()
|
|
sha256_hash.update(input_string.encode('utf-8'))
|
|
return sha256_hash.hexdigest()
|
|
|
|
|
|
def remove_tracking_from_link(url):
|
|
parsed_url = urlparse(url)
|
|
|
|
if parsed_url.netloc == 'youtu.be':
|
|
# Extract video ID
|
|
video_id = parsed_url.path[1:] # Remove leading slash
|
|
|
|
# Preserve 't' parameter if it exists
|
|
query_params = parse_qs(parsed_url.query)
|
|
if 't' in query_params:
|
|
new_query_params = {'t': query_params['t']}
|
|
new_query_string = urlencode(new_query_params, doseq=True)
|
|
else:
|
|
new_query_string = ''
|
|
|
|
cleaned_url = f"https://youtube.com/watch?v={video_id}"
|
|
if new_query_string:
|
|
new_query_string = new_query_string.replace('t=', 'start=')
|
|
cleaned_url += f"&{new_query_string}"
|
|
|
|
return cleaned_url
|
|
else:
|
|
return url
|
|
|
|
|
|
def show_ban_message():
|
|
flash(_('You have been banned.'), 'error')
|
|
logout_user()
|
|
resp = make_response(redirect(url_for('main.index')))
|
|
resp.set_cookie('sesion', '17489047567495', expires=datetime(year=2099, month=12, day=30))
|
|
return resp
|
|
|
|
|
|
# search a sorted list using a binary search. Faster than using 'in' with a unsorted list.
|
|
def in_sorted_list(arr, target):
|
|
index = bisect.bisect_left(arr, target)
|
|
return index < len(arr) and arr[index] == target
|
|
|
|
|
|
@cache.memoize(timeout=600)
|
|
def recently_upvoted_posts(user_id) -> List[int]:
|
|
post_ids = db.session.execute(text('SELECT post_id FROM "post_vote" WHERE user_id = :user_id AND effect > 0 ORDER BY id DESC LIMIT 1000'),
|
|
{'user_id': user_id}).scalars()
|
|
return sorted(post_ids) # sorted so that in_sorted_list can be used
|
|
|
|
|
|
@cache.memoize(timeout=600)
|
|
def recently_downvoted_posts(user_id) -> List[int]:
|
|
post_ids = db.session.execute(text('SELECT post_id FROM "post_vote" WHERE user_id = :user_id AND effect < 0 ORDER BY id DESC LIMIT 1000'),
|
|
{'user_id': user_id}).scalars()
|
|
return sorted(post_ids)
|
|
|
|
|
|
@cache.memoize(timeout=600)
|
|
def recently_upvoted_post_replies(user_id) -> List[int]:
|
|
reply_ids = db.session.execute(text('SELECT post_reply_id FROM "post_reply_vote" WHERE user_id = :user_id AND effect > 0 ORDER BY id DESC LIMIT 1000'),
|
|
{'user_id': user_id}).scalars()
|
|
return sorted(reply_ids) # sorted so that in_sorted_list can be used
|
|
|
|
|
|
@cache.memoize(timeout=600)
|
|
def recently_downvoted_post_replies(user_id) -> List[int]:
|
|
reply_ids = db.session.execute(text('SELECT post_reply_id FROM "post_reply_vote" WHERE user_id = :user_id AND effect < 0 ORDER BY id DESC LIMIT 1000'),
|
|
{'user_id': user_id}).scalars()
|
|
return sorted(reply_ids)
|
|
|
|
|
|
def languages_for_form():
|
|
used_languages = []
|
|
other_languages = []
|
|
if current_user.is_authenticated:
|
|
recently_used_language_ids = db.session.execute(text("""SELECT language_id
|
|
FROM (
|
|
SELECT language_id, posted_at
|
|
FROM "post"
|
|
WHERE user_id = :user_id
|
|
UNION ALL
|
|
SELECT language_id, posted_at
|
|
FROM "post_reply"
|
|
WHERE user_id = :user_id
|
|
) AS subquery
|
|
GROUP BY language_id
|
|
ORDER BY MAX(posted_at) DESC
|
|
LIMIT 10"""),
|
|
{'user_id': current_user.id}).scalars().all()
|
|
|
|
# note: recently_used_language_ids is now a List, ordered with the most recently used at the top
|
|
# but Language.query.filter(Language.id.in_(recently_used_language_ids)) isn't guaranteed to return
|
|
# language results in the same order as that List :(
|
|
for language_id in recently_used_language_ids:
|
|
if language_id is not None:
|
|
used_languages.append((language_id, ""))
|
|
|
|
# use 'English' as a default for brand new users (no posts or replies yet)
|
|
# not great, but better than them accidently using 'Afaraf' (the first in a alphabetical list of languages)
|
|
# FIXME: use site language when it is settable by admins, or anything that avoids hardcoding 'English' in
|
|
if not used_languages:
|
|
id = english_language_id()
|
|
if id:
|
|
used_languages.append((id, ""))
|
|
|
|
for language in Language.query.order_by(Language.name).all():
|
|
try:
|
|
i = used_languages.index((language.id, ""))
|
|
used_languages[i] = (language.id, language.name)
|
|
except:
|
|
if language.code != "und":
|
|
other_languages.append((language.id, language.name))
|
|
|
|
return used_languages + other_languages
|
|
|
|
|
|
def english_language_id():
|
|
english = Language.query.filter(Language.code == 'en').first()
|
|
return english.id if english else None
|
|
|
|
|
|
def actor_contains_blocked_words(actor):
|
|
actor = actor.lower().strip()
|
|
blocked_words = get_setting('actor_blocked_words')
|
|
if blocked_words and blocked_words.strip() != '':
|
|
for blocked_word in blocked_words.split('\n'):
|
|
blocked_word = blocked_word.lower().strip()
|
|
if blocked_word in actor:
|
|
return True
|
|
return False
|
|
|
|
|
|
def add_to_modlog(action: str, community_id: int = None, reason: str = '', link: str = '', link_text: str = ''):
|
|
""" Adds a new entry to the Moderation Log """
|
|
if action not in ModLog.action_map.keys():
|
|
raise Exception('Invalid action: ' + action)
|
|
if current_user.is_admin() or current_user.is_staff():
|
|
action_type = 'admin'
|
|
else:
|
|
action_type = 'mod'
|
|
db.session.add(ModLog(user_id=current_user.id, community_id=community_id, type=action_type, action=action,
|
|
reason=reason, link=link, link_text=link_text, public=get_setting('public_modlog', False)))
|
|
db.session.commit()
|
|
|
|
|
|
def add_to_modlog_activitypub(action: str, actor: User, community_id: int = None, reason: str = '', link: str = '',
|
|
link_text: str = ''):
|
|
""" Adds a new entry to the Moderation Log - identical to above except has an 'actor' parameter """
|
|
if action not in ModLog.action_map.keys():
|
|
raise Exception('Invalid action: ' + action)
|
|
if actor.is_instance_admin():
|
|
action_type = 'admin'
|
|
else:
|
|
action_type = 'mod'
|
|
db.session.add(ModLog(user_id=actor.id, community_id=community_id, type=action_type, action=action,
|
|
reason=reason, link=link, link_text=link_text, public=get_setting('public_modlog', False)))
|
|
db.session.commit()
|
|
|
|
|
|
def authorise_api_user(auth, return_type=None, id_match=None):
|
|
if not auth:
|
|
raise Exception('incorrect_login')
|
|
token = auth[7:] # remove 'Bearer '
|
|
|
|
decoded = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
|
|
if decoded:
|
|
user_id = decoded['sub']
|
|
issued_at = decoded['iat'] # use to check against blacklisted JWTs
|
|
user = User.query.filter_by(id=user_id, ap_id=None, verified=True, banned=False, deleted=False).one()
|
|
if id_match and user.id != id_match:
|
|
raise Exception('incorrect_login')
|
|
if return_type and return_type == 'model':
|
|
return user
|
|
else:
|
|
return user.id
|
|
|
|
|
|
@cache.memoize(timeout=86400)
|
|
def community_ids_from_instances(instance_ids) -> List[int]:
|
|
communities = Community.query.join(Instance, Instance.id == Community.instance_id).filter(Instance.id.in_(instance_ids))
|
|
return [community.id for community in communities]
|
|
|
|
|
|
# Set up a new SQLAlchemy session specifically for Celery tasks
|
|
def get_task_session() -> Session:
|
|
# Use the same engine as the main app, but create an independent session
|
|
return Session(bind=db.engine)
|
|
|
|
|
|
def download_defeds(defederation_subscription_id: int, domain: str):
|
|
if current_app.debug:
|
|
download_defeds_worker(defederation_subscription_id, domain)
|
|
else:
|
|
download_defeds_worker.delay(defederation_subscription_id, domain)
|
|
|
|
|
|
@celery.task
|
|
def download_defeds_worker(defederation_subscription_id: int, domain: str):
|
|
session = get_task_session()
|
|
for defederation_url in retrieve_defederation_list(domain):
|
|
session.add(BannedInstances(domain=defederation_url, reason='auto', subscription_id=defederation_subscription_id))
|
|
session.commit()
|
|
session.close()
|
|
|
|
|
|
def retrieve_defederation_list(domain: str) -> List[str]:
|
|
result = []
|
|
software = instance_software(domain)
|
|
if software == 'lemmy' or software == 'piefed':
|
|
try:
|
|
response = get_request(f'https://{domain}/api/v3/federated_instances')
|
|
except:
|
|
response = None
|
|
if response and response.status_code == 200:
|
|
instance_data = response.json()
|
|
for row in instance_data['federated_instances']['blocked']:
|
|
result.append(row['domain'])
|
|
else: # Assume mastodon-compatible API
|
|
try:
|
|
response = get_request(f'https://{domain}/api/v1/instance/domain_blocks')
|
|
except:
|
|
response = None
|
|
if response and response.status_code == 200:
|
|
instance_data = response.json()
|
|
for row in instance_data:
|
|
result.append(row['domain'])
|
|
|
|
return result
|
|
|
|
|
|
def instance_software(domain: str):
|
|
instance = Instance.query.filter(Instance.domain == domain).first()
|
|
return instance.software.lower() if instance else ''
|
|
|
|
|
|
user2_cache = {}
|
|
def jaccard_similarity(user1_upvoted: set, user2_id: int):
|
|
if user2_id not in user2_cache:
|
|
user2_upvoted_posts = ['post/' + str(id) for id in recently_upvoted_posts(user2_id)]
|
|
user2_upvoted_replies = ['reply/' + str(id) for id in recently_upvoted_post_replies(user2_id)]
|
|
user2_cache[user2_id] = set(user2_upvoted_posts + user2_upvoted_replies)
|
|
|
|
user2_upvoted = user2_cache[user2_id]
|
|
|
|
if len(user2_upvoted) > 12:
|
|
intersection = len(user1_upvoted.intersection(user2_upvoted))
|
|
union = len(user1_upvoted.union(user2_upvoted))
|
|
|
|
return (intersection / union) * 100
|
|
else:
|
|
return 0
|