from __future__ import annotations
import bisect
import hashlib
import mimetypes
import random
import tempfile
import urllib
from collections import defaultdict
from datetime import datetime, timedelta, date
from time import sleep
from typing import List, Literal, Union
import httpx
import markdown2
import math
from urllib.parse import urlparse, parse_qs, urlencode
from functools import wraps
import flask
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
import warnings
import jwt
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
import os
from flask import current_app, json, redirect, url_for, request, make_response, Response, g, flash
from flask_babel import _
from flask_login import current_user, logout_user
from sqlalchemy import text, or_
from sqlalchemy.orm import Session
from wtforms.fields import SelectField, SelectMultipleField
from wtforms.widgets import Select, html_params, ListWidget, CheckboxInput
from app import db, cache, httpx_client
import re
from moviepy.editor import VideoFileClip
from PIL import Image, ImageOps
from app.models import Settings, Domain, Instance, BannedInstances, User, Community, DomainBlock, ActivityPubLog, IpBan, \
Site, Post, PostReply, utcnow, Filter, CommunityMember, InstanceBlock, CommunityBan, Topic, UserBlock, Language, \
File, ModLog, CommunityBlock
# Flask's render_template function, with support for themes added
def render_template(template_name: str, **context) -> Response:
theme = current_theme()
if theme != '' and os.path.exists(f'app/templates/themes/{theme}/{template_name}'):
content = flask.render_template(f'themes/{theme}/{template_name}', **context)
else:
content = flask.render_template(template_name, **context)
# Browser caching using ETags and Cache-Control
resp = make_response(content)
if current_user.is_anonymous:
if 'etag' in context:
resp.headers.add_header('ETag', context['etag'])
resp.headers.add_header('Cache-Control', 'no-cache, max-age=600, must-revalidate')
return resp
def request_etag_matches(etag):
if 'If-None-Match' in request.headers:
old_etag = request.headers['If-None-Match']
return old_etag == etag
return False
def return_304(etag, content_type=None):
resp = make_response('', 304)
resp.headers.add_header('ETag', request.headers['If-None-Match'])
resp.headers.add_header('Cache-Control', 'no-cache, max-age=600, must-revalidate')
resp.headers.add_header('Vary', 'Accept, Cookie, Accept-Language')
if content_type:
resp.headers.set('Content-Type', content_type)
return resp
# Jinja: when a file was modified. Useful for cache-busting
def getmtime(filename):
if os.path.exists('static/' + filename):
return os.path.getmtime('static/' + filename)
# do a GET request to a uri, return the result
def get_request(uri, params=None, headers=None) -> httpx.Response:
timeout = 15 if 'washingtonpost.com' in uri else 5 # Washington Post is really slow on og:image for some reason
if headers is None:
headers = {'User-Agent': 'PieFed/1.0'}
else:
headers.update({'User-Agent': 'PieFed/1.0'})
if params and '/webfinger' in uri:
payload_str = urllib.parse.urlencode(params, safe=':@')
else:
payload_str = urllib.parse.urlencode(params) if params else None
try:
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout, follow_redirects=True)
except ValueError as ex:
# Convert to a more generic error we handle
raise httpx.HTTPError(f"HTTPError: {str(ex)}") from None
except httpx.ReadError as connection_error:
try: # retry, this time with a longer timeout
sleep(random.randint(3, 10))
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout * 2, follow_redirects=True)
except Exception as e:
current_app.logger.info(f"{uri} {connection_error}")
raise httpx_client.ReadError(f"HTTPReadError: {str(e)}") from connection_error
except httpx.HTTPError as read_timeout:
try: # retry, this time with a longer timeout
sleep(random.randint(3, 10))
response = httpx_client.get(uri, params=payload_str, headers=headers, timeout=timeout * 2, follow_redirects=True)
except Exception as e:
current_app.logger.info(f"{uri} {read_timeout}")
raise httpx.HTTPError(f"HTTPError: {str(e)}") from read_timeout
return response
# Same as get_request except updates instance on failure and does not raise any exceptions
def get_request_instance(uri, instance: Instance, params=None, headers=None) -> httpx.Response:
try:
return get_request(uri, params, headers)
except:
instance.failures += 1
instance.update_dormant_gone()
db.session.commit()
return httpx.Response(status_code=500)
# do a HEAD request to a uri, return the result
def head_request(uri, params=None, headers=None) -> httpx.Response:
if headers is None:
headers = {'User-Agent': 'PieFed/1.0'}
else:
headers.update({'User-Agent': 'PieFed/1.0'})
try:
response = httpx_client.head(uri, params=params, headers=headers, timeout=5, allow_redirects=True)
except httpx.HTTPError as er:
current_app.logger.info(f"{uri} {er}")
raise httpx.HTTPError(f"HTTPError: {str(er)}") from er
return response
# Saves an arbitrary object into a persistent key-value store. cached.
# Similar to g.site.* except g.site.* is populated on every single page load so g.site is best for settings that are
# accessed very often (e.g. every page load)
@cache.memoize(timeout=50)
def get_setting(name: str, default=None):
setting = Settings.query.filter_by(name=name).first()
if setting is None:
return default
else:
return json.loads(setting.value)
# retrieves arbitrary object from persistent key-value store
def set_setting(name: str, value):
setting = Settings.query.filter_by(name=name).first()
if setting is None:
db.session.add(Settings(name=name, value=json.dumps(value)))
else:
setting.value = json.dumps(value)
db.session.commit()
cache.delete_memoized(get_setting)
# Return the contents of a file as a string. Inspired by PHP's function of the same name.
def file_get_contents(filename):
with open(filename, 'r') as file:
contents = file.read()
return contents
random_chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
def gibberish(length: int = 10) -> str:
return "".join([random.choice(random_chars) for x in range(length)])
# used by @cache.cached() for home page and post caching
def make_cache_key(sort=None, post_id=None, view_filter=None):
if current_user.is_anonymous:
return f'{request.url}_{sort}_{post_id}_anon_{request.headers.get("Accept")}_{request.headers.get("Accept-Language")}' # The Accept header differentiates between activitypub requests and everything else
else:
return f'{request.url}_{sort}_{post_id}_user_{current_user.id}'
def is_image_url(url):
common_image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp', '.avif', '.svg+xml', '.svg+xml; charset=utf-8']
mime_type = mime_type_using_head(url)
if mime_type:
mime_type_parts = mime_type.split('/')
return f'.{mime_type_parts[1]}' in common_image_extensions
else:
parsed_url = urlparse(url)
path = parsed_url.path.lower()
return any(path.endswith(extension) for extension in common_image_extensions)
def is_video_url(url: str) -> bool:
common_video_extensions = ['.mp4', '.webm']
mime_type = mime_type_using_head(url)
if mime_type:
mime_type_parts = mime_type.split('/')
return f'.{mime_type_parts[1]}' in common_video_extensions
else:
parsed_url = urlparse(url)
path = parsed_url.path.lower()
return any(path.endswith(extension) for extension in common_video_extensions)
def is_video_hosting_site(url: str) -> bool:
if url is None or url == '':
return False
video_hosting_sites = ['https://youtube.com', 'https://www.youtube.com', 'https://youtu.be', 'https://www.vimeo.com', 'https://www.redgifs.com/watch/']
for starts_with in video_hosting_sites:
if url.startswith(starts_with):
return True
if 'videos/watch' in url: # PeerTube
return True
return False
@cache.memoize(timeout=10)
def mime_type_using_head(url):
# Find the mime type of a url by doing a HEAD request - this is the same as GET except only the HTTP headers are transferred
try:
response = httpx_client.head(url, timeout=5)
response.raise_for_status() # Raise an exception for HTTP errors
content_type = response.headers.get('Content-Type')
if content_type:
if content_type == 'application/octet-stream':
return ''
return content_type
else:
return ''
except httpx.HTTPError as e:
return ''
# sanitise HTML using an allow list
def allowlist_html(html: str, a_target='_blank') -> str:
if html is None or html == '':
return ''
allowed_tags = ['p', 'strong', 'a', 'ul', 'ol', 'li', 'em', 'blockquote', 'cite', 'br', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'pre',
'code', 'img', 'details', 'summary', 'table', 'tr', 'td', 'th', 'tbody', 'thead', 'hr', 'span', 'small', 'sub', 'sup',
's']
# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Find all plain text links, convert to tags
re_url = re.compile(r'(http[s]?://[!-~]+)') # http(s):// followed by chars in ASCII range 33 to 126
for tag in soup.find_all(text=True):
tags = []
url = False
for t in re_url.split(tag.string):
if re_url.match(t):
# Avoid picking up trailing punctuation for raw URLs in text
href = t[:-1] if t[-1] in ['.', ',', ')', '!', ':', ';', '?'] else t
a = soup.new_tag("a", href=href)
a.string = href
tags.append(a)
if href != t:
tags.append(t[-1])
url = True
else:
tags.append(t)
if url:
for t in tags:
tag.insert_before(t)
tag.extract()
# Filter tags, leaving only safe ones
for tag in soup.find_all():
# If the tag is not in the allowed_tags list, remove it and its contents
if tag.name not in allowed_tags:
tag.extract()
else:
# Filter and sanitize attributes
for attr in list(tag.attrs):
if attr not in ['href', 'src', 'alt', 'class']:
del tag[attr]
# Remove some mastodon guff - spans with class "invisible"
if tag.name == 'span' and 'class' in tag.attrs and 'invisible' in tag.attrs['class']:
tag.extract()
# Add nofollow and target=_blank to anchors
if tag.name == 'a':
tag.attrs['rel'] = 'nofollow ugc'
tag.attrs['target'] = a_target
# Add loading=lazy to images
if tag.name == 'img':
tag.attrs['loading'] = 'lazy'
if tag.name == 'table':
tag.attrs['class'] = 'table'
clean_html = str(soup)
# avoid wrapping anchors around existing anchors (e.g. if raw URL already wrapped by remote PieFed instance)
re_double_anchor = re.compile(r'(.*?<\/a>)<\/a>')
clean_html = re_double_anchor.sub(r'\1', clean_html)
# avoid returning empty anchors
re_empty_anchor = re.compile(r'<\/a>')
clean_html = re_empty_anchor.sub(r'\1', clean_html)
# replace lemmy's spoiler markdown left in HTML
re_spoiler = re.compile(r':{3}\s*?spoiler\s+?(\S.+?)(?:\n|
)(.+?)(?:\n|
):{3}', re.S)
clean_html = re_spoiler.sub(r'\1
\2
', clean_html)
# replace strikethough markdown left in HTML
re_strikethough = re.compile(r'~~(.*)~~')
clean_html = re_strikethough.sub(r'\1', clean_html)
# replace subscript markdown left in HTML
re_subscript = re.compile(r'~(\S+)~')
clean_html = re_subscript.sub(r'\1', clean_html)
# replace superscript markdown left in HTML
re_superscript = re.compile(r'\^(\S+)\^')
clean_html = re_superscript.sub(r'\1', clean_html)
# replace for mp4 with