2023-12-21 18:34:45 -08:00
from __future__ import annotations
2024-04-10 19:04:57 -07:00
import bisect
2024-02-24 19:24:50 -08:00
import hashlib
2024-02-27 15:55:30 -08:00
import mimetypes
2023-09-09 01:46:40 -07:00
import random
2024-04-15 21:35:12 -07:00
import tempfile
2024-02-20 11:36:00 -08:00
import urllib
2024-01-10 23:39:22 -08:00
from collections import defaultdict
from datetime import datetime , timedelta , date
2024-01-01 22:41:00 -08:00
from typing import List , Literal , Union
2023-12-07 20:13:38 -08:00
2023-10-10 02:25:37 -07:00
import markdown2
import math
2024-03-08 01:09:54 -08:00
from urllib . parse import urlparse , parse_qs , urlencode
2023-10-22 17:03:35 -07:00
from functools import wraps
2023-09-16 00:09:04 -07:00
import flask
2024-04-15 21:35:12 -07:00
from bs4 import BeautifulSoup , MarkupResemblesLocatorWarning
2024-03-16 13:39:16 -07:00
import warnings
2024-06-05 01:33:00 -07:00
from app . activitypub . signature import default_context
2024-03-16 13:39:16 -07:00
warnings . filterwarnings ( " ignore " , category = MarkupResemblesLocatorWarning )
2023-08-29 03:01:06 -07:00
import requests
import os
2024-04-03 00:48:39 -07:00
from flask import current_app , json , redirect , url_for , request , make_response , Response , g , flash
from flask_login import current_user , logout_user
2024-01-10 23:39:22 -08:00
from sqlalchemy import text , or_
2023-12-13 00:04:11 -08:00
from wtforms . fields import SelectField , SelectMultipleField
from wtforms . widgets import Select , html_params , ListWidget , CheckboxInput
2023-09-16 00:09:04 -07:00
from app import db , cache
2024-01-09 22:54:54 -08:00
import re
2024-04-15 21:35:12 -07:00
from moviepy . editor import VideoFileClip
2024-06-21 23:18:26 -07:00
from PIL import Image , ImageOps
2023-12-21 18:34:45 -08:00
2024-02-01 18:30:03 -08:00
from app . email import send_welcome_email
2024-01-01 22:41:00 -08:00
from app . models import Settings , Domain , Instance , BannedInstances , User , Community , DomainBlock , ActivityPubLog , IpBan , \
2024-06-21 23:18:26 -07:00
Site , Post , PostReply , utcnow , Filter , CommunityMember , InstanceBlock , CommunityBan , Topic , UserBlock , Language , \
File
2023-09-16 00:09:04 -07:00
# Flask's render_template function, with support for themes added
2023-12-09 18:10:09 -08:00
def render_template ( template_name : str , * * context ) - > Response :
2024-02-06 20:31:12 -08:00
theme = current_theme ( )
if theme != ' ' and os . path . exists ( f ' app/templates/themes/ { theme } / { template_name } ' ) :
2023-12-09 18:10:09 -08:00
content = flask . render_template ( f ' themes/ { theme } / { template_name } ' , * * context )
2023-09-16 00:09:04 -07:00
else :
2023-12-09 18:10:09 -08:00
content = flask . render_template ( template_name , * * context )
# Browser caching using ETags and Cache-Control
resp = make_response ( content )
2023-12-10 23:46:38 -08:00
if current_user . is_anonymous :
if ' etag ' in context :
resp . headers . add_header ( ' ETag ' , context [ ' etag ' ] )
resp . headers . add_header ( ' Cache-Control ' , ' no-cache, max-age=600, must-revalidate ' )
2023-12-09 18:10:09 -08:00
return resp
def request_etag_matches ( etag ) :
if ' If-None-Match ' in request . headers :
old_etag = request . headers [ ' If-None-Match ' ]
return old_etag == etag
return False
2023-12-11 21:28:49 -08:00
def return_304 ( etag , content_type = None ) :
2023-12-09 18:10:09 -08:00
resp = make_response ( ' ' , 304 )
resp . headers . add_header ( ' ETag ' , request . headers [ ' If-None-Match ' ] )
resp . headers . add_header ( ' Cache-Control ' , ' no-cache, max-age=600, must-revalidate ' )
2024-03-17 00:46:33 -07:00
resp . headers . add_header ( ' Vary ' , ' Accept, Cookie, Accept-Language ' )
2023-12-11 21:28:49 -08:00
if content_type :
resp . headers . set ( ' Content-Type ' , content_type )
2023-12-09 18:10:09 -08:00
return resp
2023-08-29 03:01:06 -07:00
# Jinja: when a file was modified. Useful for cache-busting
def getmtime ( filename ) :
2024-02-13 00:28:33 -08:00
if os . path . exists ( ' static/ ' + filename ) :
return os . path . getmtime ( ' static/ ' + filename )
2023-08-29 03:01:06 -07:00
# do a GET request to a uri, return the result
def get_request ( uri , params = None , headers = None ) - > requests . Response :
2023-11-16 01:31:14 -08:00
if headers is None :
headers = { ' User-Agent ' : ' PieFed/1.0 ' }
else :
headers . update ( { ' User-Agent ' : ' PieFed/1.0 ' } )
2024-02-20 11:36:00 -08:00
if params and ' /webfinger ' in uri :
payload_str = urllib . parse . urlencode ( params , safe = ' :@ ' )
else :
payload_str = urllib . parse . urlencode ( params ) if params else None
2023-08-29 03:01:06 -07:00
try :
2024-06-21 23:18:26 -07:00
timeout = 15 if ' washingtonpost.com ' in uri else 5 # Washington Post is really slow on og:image for some reason
response = requests . get ( uri , params = payload_str , headers = headers , timeout = timeout , allow_redirects = True )
2023-08-29 03:01:06 -07:00
except requests . exceptions . SSLError as invalid_cert :
# Not our problem if the other end doesn't have proper SSL
current_app . logger . info ( f " { uri } { invalid_cert } " )
raise requests . exceptions . SSLError from invalid_cert
except ValueError as ex :
# Convert to a more generic error we handle
raise requests . exceptions . RequestException ( f " InvalidCodepoint: { str ( ex ) } " ) from None
2023-12-21 19:18:44 -08:00
except requests . exceptions . ReadTimeout as read_timeout :
current_app . logger . info ( f " { uri } { read_timeout } " )
raise requests . exceptions . ReadTimeout from read_timeout
2024-03-16 13:33:48 -07:00
except requests . exceptions . ConnectionError as connection_error :
current_app . logger . info ( f " { uri } { connection_error } " )
raise requests . exceptions . ConnectionError from connection_error
2023-08-29 03:01:06 -07:00
return response
2023-09-02 21:30:20 -07:00
2023-12-28 20:32:35 -08:00
# do a HEAD request to a uri, return the result
def head_request ( uri , params = None , headers = None ) - > requests . Response :
if headers is None :
headers = { ' User-Agent ' : ' PieFed/1.0 ' }
else :
headers . update ( { ' User-Agent ' : ' PieFed/1.0 ' } )
try :
response = requests . head ( uri , params = params , headers = headers , timeout = 5 , allow_redirects = True )
except requests . exceptions . SSLError as invalid_cert :
# Not our problem if the other end doesn't have proper SSL
current_app . logger . info ( f " { uri } { invalid_cert } " )
raise requests . exceptions . SSLError from invalid_cert
except ValueError as ex :
# Convert to a more generic error we handle
raise requests . exceptions . RequestException ( f " InvalidCodepoint: { str ( ex ) } " ) from None
except requests . exceptions . ReadTimeout as read_timeout :
current_app . logger . info ( f " { uri } { read_timeout } " )
raise requests . exceptions . ReadTimeout from read_timeout
return response
2023-09-16 00:09:04 -07:00
# saves an arbitrary object into a persistent key-value store. cached.
2023-09-17 02:19:51 -07:00
@cache.memoize ( timeout = 50 )
2023-09-02 21:30:20 -07:00
def get_setting ( name : str , default = None ) :
setting = Settings . query . filter_by ( name = name ) . first ( )
if setting is None :
return default
else :
return json . loads ( setting . value )
2023-09-09 01:46:40 -07:00
# retrieves arbitrary object from persistent key-value store
2023-09-02 21:30:20 -07:00
def set_setting ( name : str , value ) :
setting = Settings . query . filter_by ( name = name ) . first ( )
if setting is None :
2023-09-17 02:19:51 -07:00
db . session . add ( Settings ( name = name , value = json . dumps ( value ) ) )
2023-09-02 21:30:20 -07:00
else :
setting . value = json . dumps ( value )
db . session . commit ( )
2023-09-16 00:09:04 -07:00
cache . delete_memoized ( get_setting )
2023-09-05 01:25:10 -07:00
# Return the contents of a file as a string. Inspired by PHP's function of the same name.
def file_get_contents ( filename ) :
with open ( filename , ' r ' ) as file :
contents = file . read ( )
return contents
2023-09-09 01:46:40 -07:00
random_chars = ' 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ '
def gibberish ( length : int = 10 ) - > str :
return " " . join ( [ random . choice ( random_chars ) for x in range ( length ) ] )
2023-09-16 00:09:04 -07:00
def is_image_url ( url ) :
common_image_extensions = [ ' .jpg ' , ' .jpeg ' , ' .png ' , ' .gif ' , ' .bmp ' , ' .tiff ' , ' .webp ' ]
2024-06-18 22:46:36 -07:00
mime_type = mime_type_using_head ( url )
if mime_type :
mime_type_parts = mime_type . split ( ' / ' )
return f ' . { mime_type_parts [ 1 ] } ' in common_image_extensions
else :
parsed_url = urlparse ( url )
path = parsed_url . path . lower ( )
return any ( path . endswith ( extension ) for extension in common_image_extensions )
2023-09-16 00:09:04 -07:00
2024-04-16 01:59:58 -07:00
def is_video_url ( url ) :
common_video_extensions = [ ' .mp4 ' , ' .webm ' ]
2024-06-18 22:46:36 -07:00
mime_type = mime_type_using_head ( url )
if mime_type :
mime_type_parts = mime_type . split ( ' / ' )
return f ' . { mime_type_parts [ 1 ] } ' in common_video_extensions
else :
parsed_url = urlparse ( url )
path = parsed_url . path . lower ( )
return any ( path . endswith ( extension ) for extension in common_video_extensions )
@cache.memoize ( timeout = 10 )
def mime_type_using_head ( url ) :
# Find the mime type of a url by doing a HEAD request - this is the same as GET except only the HTTP headers are transferred
try :
response = requests . head ( url )
response . raise_for_status ( ) # Raise an exception for HTTP errors
content_type = response . headers . get ( ' Content-Type ' )
if content_type :
return content_type
else :
return ' '
except requests . exceptions . RequestException as e :
return ' '
2024-04-16 01:59:58 -07:00
2023-09-16 00:09:04 -07:00
# sanitise HTML using an allow list
def allowlist_html ( html : str ) - > str :
2024-01-08 01:43:38 -08:00
if html is None or html == ' ' :
return ' '
2024-03-07 13:00:11 -08:00
allowed_tags = [ ' p ' , ' strong ' , ' a ' , ' ul ' , ' ol ' , ' li ' , ' em ' , ' blockquote ' , ' cite ' , ' br ' , ' h1 ' , ' h2 ' , ' h3 ' , ' h4 ' , ' h5 ' , ' h6 ' , ' pre ' ,
2024-05-20 23:07:07 -07:00
' code ' , ' img ' , ' details ' , ' summary ' , ' table ' , ' tr ' , ' td ' , ' th ' , ' tbody ' , ' thead ' , ' hr ' , ' span ' , ' small ' , ' sub ' , ' sup ' ]
2023-09-16 00:09:04 -07:00
# Parse the HTML using BeautifulSoup
soup = BeautifulSoup ( html , ' html.parser ' )
2024-01-09 22:54:54 -08:00
# Find all plain text links, convert to <a> tags
2024-06-17 08:54:27 -07:00
re_url = re . compile ( r ' (http[s]?://[!-~]+) ' ) # http(s):// followed by chars in ASCII range 33 to 126
2024-01-10 11:21:33 -08:00
for tag in soup . find_all ( text = True ) :
tags = [ ]
url = False
for t in re_url . split ( tag . string ) :
if re_url . match ( t ) :
2024-06-03 13:51:32 -07:00
# Avoid picking up trailing punctuation for raw URLs in text
2024-06-03 13:29:00 -07:00
href = t [ : - 1 ] if t [ - 1 ] in [ ' . ' , ' , ' , ' ) ' , ' ! ' , ' : ' , ' ; ' , ' ? ' ] else t
a = soup . new_tag ( " a " , href = href )
a . string = href
2024-01-10 11:21:33 -08:00
tags . append ( a )
2024-06-03 13:29:00 -07:00
if href != t :
tags . append ( t [ - 1 ] )
2024-01-10 11:21:33 -08:00
url = True
else :
tags . append ( t )
if url :
for t in tags :
tag . insert_before ( t )
tag . extract ( )
# Filter tags, leaving only safe ones
2023-09-16 00:09:04 -07:00
for tag in soup . find_all ( ) :
# If the tag is not in the allowed_tags list, remove it and its contents
if tag . name not in allowed_tags :
tag . extract ( )
else :
# Filter and sanitize attributes
for attr in list ( tag . attrs ) :
2024-03-17 20:03:53 -07:00
if attr not in [ ' href ' , ' src ' , ' alt ' , ' class ' ] :
2023-09-16 00:09:04 -07:00
del tag [ attr ]
2024-01-09 22:54:54 -08:00
# Add nofollow and target=_blank to anchors
if tag . name == ' a ' :
tag . attrs [ ' rel ' ] = ' nofollow ugc '
tag . attrs [ ' target ' ] = ' _blank '
2024-02-17 02:11:39 -08:00
# Add loading=lazy to images
if tag . name == ' img ' :
tag . attrs [ ' loading ' ] = ' lazy '
2024-02-26 07:35:03 -08:00
if tag . name == ' table ' :
tag . attrs [ ' class ' ] = ' table '
2023-09-16 00:09:04 -07:00
2024-04-06 02:43:06 -07:00
# avoid returning empty anchors
re_empty_anchor = re . compile ( r ' <a href= " (.*?) " rel= " nofollow ugc " target= " _blank " >< \ /a> ' )
return re_empty_anchor . sub ( r ' <a href= " \ 1 " rel= " nofollow ugc " target= " _blank " > \ 1</a> ' , str ( soup ) )
2023-09-16 00:09:04 -07:00
2024-05-14 11:33:08 -07:00
# this is for pyfedi's version of Markdown (differs from lemmy for: newlines for soft breaks, ...)
2023-10-10 02:25:37 -07:00
def markdown_to_html ( markdown_text ) - > str :
2024-05-14 11:33:08 -07:00
if markdown_text :
raw_html = markdown2 . markdown ( markdown_text , safe_mode = True ,
extras = { ' middle-word-em ' : False , ' tables ' : True , ' fenced-code-blocks ' : True , ' strike ' : True , ' breaks ' : { ' on_newline ' : True , ' on_backslash ' : True } } )
# support lemmy's spoiler format
re_spoiler = re . compile ( r ' : {3} \ s*?spoiler \ s+?( \ S.+?)(?: \ n|</p>)(.+?)(?: \ n|<p>): {3} ' , re . S )
raw_html = re_spoiler . sub ( r ' <details><summary> \ 1</summary><p> \ 2</p></details> ' , raw_html )
return allowlist_html ( raw_html )
else :
return ' '
# this is for lemmy's version of Markdown (can be removed in future - when HTML from them filtered through an allow_list is used, instead of MD)
def lemmy_markdown_to_html ( markdown_text ) - > str :
2023-10-20 19:49:01 -07:00
if markdown_text :
2024-03-31 19:13:58 -07:00
raw_html = markdown2 . markdown ( markdown_text , safe_mode = True , extras = { ' middle-word-em ' : False , ' tables ' : True , ' fenced-code-blocks ' : True , ' strike ' : True } )
2024-05-14 11:33:08 -07:00
# replace lemmy spoiler tokens with appropriate html tags instead.
2024-04-15 16:06:54 -07:00
re_spoiler = re . compile ( r ' : {3} \ s*?spoiler \ s+?( \ S.+?)(?: \ n|</p>)(.+?)(?: \ n|<p>): {3} ' , re . S )
2024-04-10 20:55:30 -07:00
raw_html = re_spoiler . sub ( r ' <details><summary> \ 1</summary><p> \ 2</p></details> ' , raw_html )
2024-03-31 19:13:58 -07:00
return allowlist_html ( raw_html )
2023-10-20 19:49:01 -07:00
else :
return ' '
2023-10-10 02:25:37 -07:00
2023-10-23 00:18:46 -07:00
def markdown_to_text ( markdown_text ) - > str :
2023-12-27 23:39:26 -08:00
if not markdown_text or markdown_text == ' ' :
return ' '
2023-10-23 00:18:46 -07:00
return markdown_text . replace ( " # " , ' ' )
2024-05-21 03:20:08 -07:00
def html_to_text ( html ) - > str :
if html is None or html == ' ' :
return ' '
soup = BeautifulSoup ( html , ' html.parser ' )
return soup . get_text ( )
2024-03-26 15:46:15 -07:00
def microblog_content_to_title ( html : str ) - > str :
2024-05-17 20:08:42 -07:00
title = ' '
2024-04-30 14:04:57 -07:00
if ' <p> ' in html :
soup = BeautifulSoup ( html , ' html.parser ' )
for tag in soup . find_all ( ' p ' ) :
2024-05-10 07:46:16 -07:00
title = tag . get_text ( separator = " " )
2024-05-17 16:51:32 -07:00
if title and title . strip ( ) != ' ' and len ( title . strip ( ) ) > = 5 :
break
2024-04-30 14:04:57 -07:00
else :
2024-05-21 03:20:08 -07:00
title = html_to_text ( html )
2024-04-30 13:48:37 -07:00
period_index = title . find ( ' . ' )
2024-05-01 01:05:49 -07:00
question_index = title . find ( ' ? ' )
2024-05-10 05:35:15 -07:00
exclamation_index = title . find ( ' ! ' )
2024-05-01 01:05:49 -07:00
2024-05-10 05:42:12 -07:00
# Find the earliest occurrence of either '.' or '?' or '!'
2024-05-01 01:05:49 -07:00
end_index = min ( period_index if period_index != - 1 else float ( ' inf ' ) ,
2024-05-10 05:35:15 -07:00
question_index if question_index != - 1 else float ( ' inf ' ) ,
exclamation_index if exclamation_index != - 1 else float ( ' inf ' ) )
2024-05-01 01:05:49 -07:00
2024-05-17 16:51:32 -07:00
# there's no recognised punctuation
2024-05-10 05:42:12 -07:00
if end_index == float ( ' inf ' ) :
2024-05-17 16:51:32 -07:00
if len ( title ) > = 10 :
title = title . replace ( ' @ ' , ' ' ) . replace ( ' # ' , ' ' )
title = shorten_string ( title , 197 )
else :
title = ' (content in post body) '
2024-05-10 05:42:12 -07:00
return title
2024-05-01 01:05:49 -07:00
if end_index != - 1 :
2024-05-10 07:46:16 -07:00
if question_index != - 1 and question_index == end_index :
2024-05-01 02:02:25 -07:00
end_index + = 1 # Add the ? back on
2024-05-10 07:46:16 -07:00
if exclamation_index != - 1 and exclamation_index == end_index :
2024-05-10 05:35:15 -07:00
end_index + = 1 # Add the ! back on
2024-05-01 01:05:49 -07:00
title = title [ : end_index ]
2024-04-30 13:48:37 -07:00
if len ( title ) > 150 :
for i in range ( 149 , - 1 , - 1 ) :
if title [ i ] == ' ' :
break
title = title [ : i ] + ' ... ' if i > 0 else ' '
return title
2024-03-26 15:46:15 -07:00
2024-03-28 19:58:25 -07:00
def community_link_to_href ( link : str ) - > str :
pattern = r " !([a-zA-Z0-9_.-]*)@([a-zA-Z0-9_.-]*) \ b "
server = r ' <a href=https:// ' + current_app . config [ ' SERVER_NAME ' ] + r ' /community/lookup/ '
return re . sub ( pattern , server + r ' \ g<1>/ \ g<2>> ' + r ' ! \ g<1>@ \ g<2></a> ' , link )
2023-11-21 23:48:27 -08:00
def domain_from_url ( url : str , create = True ) - > Domain :
2023-11-28 23:32:07 -08:00
parsed_url = urlparse ( url . lower ( ) . replace ( ' www. ' , ' ' ) )
2024-02-22 19:52:17 -08:00
if parsed_url and parsed_url . hostname :
2024-03-08 01:01:46 -08:00
find_this = parsed_url . hostname . lower ( )
if find_this == ' youtu.be ' :
find_this = ' youtube.com '
domain = Domain . query . filter_by ( name = find_this ) . first ( )
2024-02-22 19:52:17 -08:00
if create and domain is None :
2024-03-08 01:01:46 -08:00
domain = Domain ( name = find_this )
2024-02-22 19:52:17 -08:00
db . session . add ( domain )
db . session . commit ( )
return domain
else :
return None
2023-09-16 00:09:04 -07:00
2023-10-02 02:16:44 -07:00
def shorten_string ( input_str , max_length = 50 ) :
2024-04-15 00:24:05 -07:00
if input_str :
if len ( input_str ) < = max_length :
return input_str
else :
return input_str [ : max_length - 3 ] + ' … '
2023-10-02 02:16:44 -07:00
else :
2024-04-15 00:24:05 -07:00
return ' '
2023-10-02 02:16:44 -07:00
def shorten_url ( input : str , max_length = 20 ) :
return shorten_string ( input . replace ( ' https:// ' , ' ' ) . replace ( ' http:// ' , ' ' ) )
2023-10-10 02:25:37 -07:00
# the number of digits in a number. e.g. 1000 would be 4
def digits ( input : int ) - > int :
2024-01-11 00:18:34 -08:00
return len ( shorten_number ( input ) )
2023-10-20 19:49:01 -07:00
@cache.memoize ( timeout = 50 )
def user_access ( permission : str , user_id : int ) - > bool :
has_access = db . session . execute ( text ( ' SELECT * FROM " role_permission " as rp ' +
' INNER JOIN user_role ur on rp.role_id = ur.role_id ' +
' WHERE ur.user_id = :user_id AND rp.permission = :permission ' ) ,
{ ' user_id ' : user_id , ' permission ' : permission } ) . first ( )
2023-10-20 20:20:13 -07:00
return has_access is not None
2024-01-03 20:07:02 -08:00
@cache.memoize ( timeout = 10 )
2023-12-03 01:41:15 -08:00
def community_membership ( user : User , community : Community ) - > int :
if community is None :
return False
return user . subscribed ( community . id )
2024-03-21 01:19:50 -07:00
@cache.memoize ( timeout = 86400 )
2024-04-28 21:03:00 -07:00
def communities_banned_from ( user_id : int ) - > List [ int ] :
2024-03-21 01:19:50 -07:00
community_bans = CommunityBan . query . filter ( CommunityBan . user_id == user_id ) . all ( )
return [ cb . community_id for cb in community_bans ]
2023-12-16 03:12:49 -08:00
@cache.memoize ( timeout = 86400 )
def blocked_domains ( user_id ) - > List [ int ] :
blocks = DomainBlock . query . filter_by ( user_id = user_id )
return [ block . domain_id for block in blocks ]
2024-03-12 00:06:24 -07:00
@cache.memoize ( timeout = 86400 )
def blocked_instances ( user_id ) - > List [ int ] :
blocks = InstanceBlock . query . filter_by ( user_id = user_id )
return [ block . instance_id for block in blocks ]
2024-04-13 13:57:46 -07:00
@cache.memoize ( timeout = 86400 )
def blocked_users ( user_id ) - > List [ int ] :
blocks = UserBlock . query . filter_by ( blocker_id = user_id )
return [ block . blocked_id for block in blocks ]
2024-03-21 16:22:19 -07:00
@cache.memoize ( timeout = 86400 )
def blocked_phrases ( ) - > List [ str ] :
site = Site . query . get ( 1 )
if site . blocked_phrases :
return [ phrase for phrase in site . blocked_phrases . split ( ' \n ' ) if phrase != ' ' ]
else :
return [ ]
2024-03-21 18:35:51 -07:00
@cache.memoize ( timeout = 86400 )
def blocked_referrers ( ) - > List [ str ] :
site = Site . query . get ( 1 )
if site . auto_decline_referrers :
return [ referrer for referrer in site . auto_decline_referrers . split ( ' \n ' ) if referrer != ' ' ]
else :
return [ ]
2023-10-20 20:20:13 -07:00
def retrieve_block_list ( ) :
try :
2024-01-03 01:52:19 -08:00
response = requests . get ( ' https://raw.githubusercontent.com/rimu/no-qanon/master/domains.txt ' , timeout = 1 )
2023-10-20 20:20:13 -07:00
except :
return None
if response and response . status_code == 200 :
2023-10-22 17:03:35 -07:00
return response . text
2024-03-04 12:07:26 -08:00
def retrieve_peertube_block_list ( ) :
try :
response = requests . get ( ' https://peertube_isolation.frama.io/list/peertube_isolation.json ' , timeout = 1 )
except :
return None
list = ' '
if response and response . status_code == 200 :
response_data = response . json ( )
for row in response_data [ ' data ' ] :
list + = row [ ' value ' ] + " \n "
return list . strip ( )
2023-12-23 19:20:18 -08:00
def ensure_directory_exists ( directory ) :
parts = directory . split ( ' / ' )
rebuild_directory = ' '
for part in parts :
rebuild_directory + = part
if not os . path . isdir ( rebuild_directory ) :
os . mkdir ( rebuild_directory )
rebuild_directory + = ' / '
2024-02-27 15:55:30 -08:00
def mimetype_from_url ( url ) :
parsed_url = urlparse ( url )
path = parsed_url . path . split ( ' ? ' ) [ 0 ] # Strip off anything after '?'
mime_type , _ = mimetypes . guess_type ( path )
return mime_type
2023-10-22 17:03:35 -07:00
def validation_required ( func ) :
@wraps ( func )
def decorated_view ( * args , * * kwargs ) :
if current_user . verified :
return func ( * args , * * kwargs )
else :
return redirect ( url_for ( ' auth.validation_required ' ) )
2023-11-03 01:59:48 -07:00
return decorated_view
def permission_required ( permission ) :
def decorator ( func ) :
@wraps ( func )
def decorated_view ( * args , * * kwargs ) :
if user_access ( permission , current_user . id ) :
return func ( * args , * * kwargs )
else :
# Handle the case where the user doesn't have the required permission
return redirect ( url_for ( ' auth.permission_denied ' ) )
return decorated_view
return decorator
2023-11-29 23:57:51 -08:00
# sends the user back to where they came from
def back ( default_url ) :
# Get the referrer from the request headers
referrer = request . referrer
# If the referrer exists and is not the same as the current request URL, redirect to the referrer
if referrer and referrer != request . url :
return redirect ( referrer )
# If referrer is not available or is the same as the current request URL, redirect to the default URL
return redirect ( default_url )
2023-12-07 20:13:38 -08:00
# format a datetime in a way that is used in ActivityPub
def ap_datetime ( date_time : datetime ) - > str :
return date_time . isoformat ( ) + ' +00:00 '
2023-12-13 00:04:11 -08:00
class MultiCheckboxField ( SelectMultipleField ) :
widget = ListWidget ( prefix_label = False )
2023-12-29 22:03:44 -08:00
option_widget = CheckboxInput ( )
def ip_address ( ) - > str :
ip = request . headers . get ( ' X-Forwarded-For ' ) or request . remote_addr
if ' , ' in ip : # Remove all but first ip addresses
ip = ip [ : ip . index ( ' , ' ) ] . strip ( )
return ip
def user_ip_banned ( ) - > bool :
current_ip_address = ip_address ( )
if current_ip_address :
return current_ip_address in banned_ip_addresses ( )
2024-01-02 19:29:58 -08:00
@cache.memoize ( timeout = 30 )
def instance_banned ( domain : str ) - > bool : # see also activitypub.util.instance_blocked()
2024-04-09 00:23:19 -07:00
if domain is None or domain == ' ' :
return False
2024-01-01 19:07:41 -08:00
banned = BannedInstances . query . filter_by ( domain = domain ) . first ( )
return banned is not None
2023-12-29 22:03:44 -08:00
def user_cookie_banned ( ) - > bool :
cookie = request . cookies . get ( ' sesion ' , None )
return cookie is not None
2024-02-18 18:01:53 -08:00
@cache.memoize ( timeout = 30 )
2023-12-29 22:03:44 -08:00
def banned_ip_addresses ( ) - > List [ str ] :
ips = IpBan . query . all ( )
return [ ip . ip_address for ip in ips ]
2024-01-01 22:41:00 -08:00
2024-01-02 19:29:58 -08:00
def can_downvote ( user , community : Community , site = None ) - > bool :
2024-03-20 03:50:42 -07:00
if user is None or community is None or user . banned or user . bot :
2024-01-01 22:41:00 -08:00
return False
if site is None :
try :
site = g . site
except :
site = Site . query . get ( 1 )
2024-01-02 19:29:58 -08:00
if not site . enable_downvotes and community . is_local ( ) :
2024-01-01 22:41:00 -08:00
return False
2024-01-02 19:29:58 -08:00
if community . local_only and not user . is_local ( ) :
return False
2024-01-01 22:41:00 -08:00
2024-01-07 21:50:37 -08:00
if user . attitude < - 0.40 or user . reputation < - 10 : # this should exclude about 3.7% of users.
2024-01-01 22:41:00 -08:00
return False
2024-03-21 01:19:50 -07:00
if community . id in communities_banned_from ( user . id ) :
return False
2024-01-01 22:41:00 -08:00
return True
2024-01-02 19:29:58 -08:00
def can_upvote ( user , community : Community ) - > bool :
2024-03-20 03:50:42 -07:00
if user is None or community is None or user . banned or user . bot :
2024-01-01 22:41:00 -08:00
return False
2024-03-21 01:19:50 -07:00
if community . id in communities_banned_from ( user . id ) :
return False
2024-01-01 22:41:00 -08:00
return True
2024-02-23 14:07:06 -08:00
def can_create_post ( user , content : Community ) - > bool :
2024-01-01 22:41:00 -08:00
if user is None or content is None or user . banned :
return False
2024-02-23 14:07:06 -08:00
if content . is_moderator ( user ) or user . is_admin ( ) :
return True
2024-01-01 22:41:00 -08:00
2024-02-23 14:07:06 -08:00
if content . restricted_to_mods :
return False
2024-01-01 22:41:00 -08:00
2024-02-23 14:07:06 -08:00
if content . local_only and not user . is_local ( ) :
return False
2024-01-01 22:41:00 -08:00
2024-03-21 01:19:50 -07:00
if content . id in communities_banned_from ( user . id ) :
return False
2024-02-23 14:07:06 -08:00
return True
2024-01-01 22:41:00 -08:00
2024-02-23 14:07:06 -08:00
def can_create_post_reply ( user , content : Community ) - > bool :
if user is None or content is None or user . banned :
return False
if content . is_moderator ( user ) or user . is_admin ( ) :
return True
if content . local_only and not user . is_local ( ) :
return False
2024-01-01 22:41:00 -08:00
2024-03-21 01:19:50 -07:00
if content . id in communities_banned_from ( user . id ) :
return False
2024-01-01 22:41:00 -08:00
return True
2024-01-02 19:29:58 -08:00
2024-01-05 17:54:10 -08:00
def reply_already_exists ( user_id , post_id , parent_id , body ) - > bool :
if parent_id is None :
num_matching_replies = db . session . execute ( text (
2024-06-04 21:23:31 -07:00
' SELECT COUNT(id) as c FROM " post_reply " WHERE deleted is false and user_id = :user_id AND post_id = :post_id AND parent_id is null AND body = :body ' ) ,
2024-01-05 17:54:10 -08:00
{ ' user_id ' : user_id , ' post_id ' : post_id , ' body ' : body } ) . scalar ( )
else :
num_matching_replies = db . session . execute ( text (
2024-06-04 21:23:31 -07:00
' SELECT COUNT(id) as c FROM " post_reply " WHERE deleted is false and user_id = :user_id AND post_id = :post_id AND parent_id = :parent_id AND body = :body ' ) ,
2024-01-05 17:54:10 -08:00
{ ' user_id ' : user_id , ' post_id ' : post_id , ' parent_id ' : parent_id , ' body ' : body } ) . scalar ( )
return num_matching_replies != 0
def reply_is_just_link_to_gif_reaction ( body ) - > bool :
tmp_body = body . strip ( )
if tmp_body . startswith ( ' https://media.tenor.com/ ' ) or \
2024-02-04 11:03:56 -08:00
tmp_body . startswith ( ' https://media1.tenor.com/ ' ) or \
tmp_body . startswith ( ' https://media2.tenor.com/ ' ) or \
tmp_body . startswith ( ' https://media3.tenor.com/ ' ) or \
2024-01-07 00:36:04 -08:00
tmp_body . startswith ( ' https://i.giphy.com/ ' ) or \
2024-01-09 22:06:35 -08:00
tmp_body . startswith ( ' https://i.imgflip.com ' ) or \
2024-01-05 17:54:10 -08:00
tmp_body . startswith ( ' https://media1.giphy.com/ ' ) or \
tmp_body . startswith ( ' https://media2.giphy.com/ ' ) or \
tmp_body . startswith ( ' https://media3.giphy.com/ ' ) or \
tmp_body . startswith ( ' https://media4.giphy.com/ ' ) :
return True
else :
return False
2024-04-21 20:25:37 -07:00
def reply_is_stupid ( body ) - > bool :
lower_body = body . lower ( ) . strip ( )
if lower_body == ' this ' or lower_body == ' this. ' or lower_body == ' this! ' :
return True
return False
2024-01-02 19:29:58 -08:00
def inbox_domain ( inbox : str ) - > str :
inbox = inbox . lower ( )
if ' https:// ' in inbox or ' http:// ' in inbox :
inbox = urlparse ( inbox ) . hostname
return inbox
def awaken_dormant_instance ( instance ) :
if instance and not instance . gone_forever :
if instance . dormant :
if instance . start_trying_again < utcnow ( ) :
instance . dormant = False
db . session . commit ( )
# give up after ~5 days of trying
if instance . start_trying_again and utcnow ( ) + timedelta ( days = 5 ) < instance . start_trying_again :
instance . gone_forever = True
instance . dormant = True
db . session . commit ( )
2024-01-02 23:14:39 -08:00
2024-01-09 12:44:59 -08:00
def shorten_number ( number ) :
if number < 1000 :
return str ( number )
elif number < 1000000 :
return f ' { number / 1000 : .1f } k '
else :
return f ' { number / 1000000 : .1f } M '
2024-01-10 23:39:22 -08:00
@cache.memoize ( timeout = 300 )
def user_filters_home ( user_id ) :
filters = Filter . query . filter_by ( user_id = user_id , filter_home = True ) . filter ( or_ ( Filter . expire_after > date . today ( ) , Filter . expire_after == None ) )
result = defaultdict ( set )
for filter in filters :
keywords = [ keyword . strip ( ) . lower ( ) for keyword in filter . keywords . splitlines ( ) ]
if filter . hide_type == 0 :
result [ filter . title ] . update ( keywords )
else : # type == 1 means hide completely. These posts are excluded from output by the jinja template
result [ ' -1 ' ] . update ( keywords )
return result
@cache.memoize ( timeout = 300 )
def user_filters_posts ( user_id ) :
filters = Filter . query . filter_by ( user_id = user_id , filter_posts = True ) . filter ( or_ ( Filter . expire_after > date . today ( ) , Filter . expire_after == None ) )
result = defaultdict ( set )
for filter in filters :
keywords = [ keyword . strip ( ) . lower ( ) for keyword in filter . keywords . splitlines ( ) ]
if filter . hide_type == 0 :
result [ filter . title ] . update ( keywords )
else :
result [ ' -1 ' ] . update ( keywords )
return result
@cache.memoize ( timeout = 300 )
def user_filters_replies ( user_id ) :
filters = Filter . query . filter_by ( user_id = user_id , filter_replies = True ) . filter ( or_ ( Filter . expire_after > date . today ( ) , Filter . expire_after == None ) )
result = defaultdict ( set )
for filter in filters :
keywords = [ keyword . strip ( ) . lower ( ) for keyword in filter . keywords . splitlines ( ) ]
if filter . hide_type == 0 :
result [ filter . title ] . update ( keywords )
else :
result [ ' -1 ' ] . update ( keywords )
return result
2024-01-09 12:44:59 -08:00
2024-01-11 15:34:08 -08:00
@cache.memoize ( timeout = 300 )
def moderating_communities ( user_id ) :
2024-01-11 16:49:40 -08:00
if user_id is None or user_id == 0 :
2024-01-11 15:34:08 -08:00
return [ ]
return Community . query . join ( CommunityMember , Community . id == CommunityMember . community_id ) . \
filter ( Community . banned == False ) . \
2024-03-21 01:19:50 -07:00
filter ( or_ ( CommunityMember . is_moderator == True , CommunityMember . is_owner == True ) ) . \
filter ( CommunityMember . is_banned == False ) . \
2024-01-11 15:34:08 -08:00
filter ( CommunityMember . user_id == user_id ) . order_by ( Community . title ) . all ( )
@cache.memoize ( timeout = 300 )
def joined_communities ( user_id ) :
2024-01-11 16:49:40 -08:00
if user_id is None or user_id == 0 :
2024-01-11 15:34:08 -08:00
return [ ]
return Community . query . join ( CommunityMember , Community . id == CommunityMember . community_id ) . \
filter ( Community . banned == False ) . \
filter ( CommunityMember . is_moderator == False , CommunityMember . is_owner == False ) . \
2024-03-21 01:19:50 -07:00
filter ( CommunityMember . is_banned == False ) . \
2024-01-11 15:34:08 -08:00
filter ( CommunityMember . user_id == user_id ) . order_by ( Community . title ) . all ( )
2024-05-30 02:54:25 -07:00
@cache.memoize ( timeout = 3000 )
def menu_topics ( ) :
return Topic . query . filter ( Topic . parent_id == None ) . order_by ( Topic . name ) . all ( )
2024-03-12 20:40:20 -07:00
@cache.memoize ( timeout = 300 )
def community_moderators ( community_id ) :
return CommunityMember . query . filter ( ( CommunityMember . community_id == community_id ) &
( or_ (
CommunityMember . is_owner ,
CommunityMember . is_moderator
) )
) . all ( )
2024-02-01 18:30:03 -08:00
def finalize_user_setup ( user , application_required = False ) :
from app . activitypub . signature import RsaKeys
user . verified = True
user . last_seen = utcnow ( )
private_key , public_key = RsaKeys . generate_keypair ( )
user . private_key = private_key
user . public_key = public_key
2024-03-23 18:53:18 -07:00
user . ap_profile_id = f " https:// { current_app . config [ ' SERVER_NAME ' ] } /u/ { user . user_name } " . lower ( )
2024-02-01 18:30:03 -08:00
user . ap_public_url = f " https:// { current_app . config [ ' SERVER_NAME ' ] } /u/ { user . user_name } "
2024-06-03 14:44:10 -07:00
user . ap_inbox_url = f " https:// { current_app . config [ ' SERVER_NAME ' ] } /u/ { user . user_name . lower ( ) } /inbox "
2024-02-01 18:30:03 -08:00
db . session . commit ( )
send_welcome_email ( user , application_required )
2024-01-11 15:34:08 -08:00
2024-04-29 02:43:37 -07:00
def notification_subscribers ( entity_id : int , entity_type : int ) - > List [ int ] :
return list ( db . session . execute ( text ( ' SELECT user_id FROM " notification_subscription " WHERE entity_id = :entity_id AND type = :type ' ) ,
{ ' entity_id ' : entity_id , ' type ' : entity_type } ) . scalars ( ) )
2024-04-28 21:03:00 -07:00
2024-04-08 00:48:25 -07:00
# topics, in a tree
def topic_tree ( ) - > List :
topics = Topic . query . order_by ( Topic . name )
topics_dict = { topic . id : { ' topic ' : topic , ' children ' : [ ] } for topic in topics . all ( ) }
for topic in topics :
if topic . parent_id is not None :
parent_comment = topics_dict . get ( topic . parent_id )
if parent_comment :
parent_comment [ ' children ' ] . append ( topics_dict [ topic . id ] )
return [ topic for topic in topics_dict . values ( ) if topic [ ' topic ' ] . parent_id is None ]
2024-01-07 00:36:04 -08:00
# All the following post/comment ranking math is explained at https://medium.com/hacking-and-gonzo/how-reddit-ranking-algorithms-work-ef111e33d0d9
2024-01-02 23:14:39 -08:00
epoch = datetime ( 1970 , 1 , 1 )
def epoch_seconds ( date ) :
td = date - epoch
return td . days * 86400 + td . seconds + ( float ( td . microseconds ) / 1000000 )
2024-01-04 11:11:35 -08:00
2024-01-02 23:14:39 -08:00
def post_ranking ( score , date : datetime ) :
2024-01-04 11:11:35 -08:00
if date is None :
date = datetime . utcnow ( )
2024-01-05 11:16:50 -08:00
if score is None :
score = 1
2024-01-02 23:14:39 -08:00
order = math . log ( max ( abs ( score ) , 1 ) , 10 )
sign = 1 if score > 0 else - 1 if score < 0 else 0
seconds = epoch_seconds ( date ) - 1685766018
return round ( sign * order + seconds / 45000 , 7 )
2024-01-07 00:36:04 -08:00
# used for ranking comments
def _confidence ( ups , downs ) :
n = ups + downs
if n == 0 :
return 0.0
z = 1.281551565545
p = float ( ups ) / n
left = p + 1 / ( 2 * n ) * z * z
right = z * math . sqrt ( p * ( 1 - p ) / n + z * z / ( 4 * n * n ) )
under = 1 + 1 / n * z * z
return ( left - right ) / under
def confidence ( ups , downs ) - > float :
2024-01-24 23:16:08 -08:00
if ups is None or ups < 0 :
2024-01-18 10:45:48 -08:00
ups = 0
2024-01-24 23:16:08 -08:00
if downs is None or downs < 0 :
2024-01-18 10:45:48 -08:00
downs = 0
2024-01-07 00:36:04 -08:00
if ups + downs == 0 :
return 0.0
else :
return _confidence ( ups , downs )
2024-02-04 01:02:32 -08:00
2024-06-21 23:18:26 -07:00
def opengraph_parse ( url ) :
if ' ? ' in url :
url = url . split ( ' ? ' )
url = url [ 0 ]
try :
return parse_page ( url )
except Exception as ex :
return None
def url_to_thumbnail_file ( filename ) - > File :
try :
timeout = 15 if ' washingtonpost.com ' in filename else 5 # Washington Post is really slow for some reason
response = requests . get ( filename , timeout = timeout )
except :
return None
if response . status_code == 200 :
content_type = response . headers . get ( ' content-type ' )
if content_type and content_type . startswith ( ' image ' ) :
# Generate file extension from mime type
content_type_parts = content_type . split ( ' / ' )
if content_type_parts :
file_extension = ' . ' + content_type_parts [ - 1 ]
if file_extension == ' .jpeg ' :
file_extension = ' .jpg '
else :
file_extension = os . path . splitext ( filename ) [ 1 ]
file_extension = file_extension . replace ( ' %3f ' , ' ? ' ) # sometimes urls are not decoded properly
if ' ? ' in file_extension :
file_extension = file_extension . split ( ' ? ' ) [ 0 ]
new_filename = gibberish ( 15 )
directory = ' app/static/media/posts/ ' + new_filename [ 0 : 2 ] + ' / ' + new_filename [ 2 : 4 ]
ensure_directory_exists ( directory )
final_place = os . path . join ( directory , new_filename + file_extension )
with open ( final_place , ' wb ' ) as f :
f . write ( response . content )
response . close ( )
Image . MAX_IMAGE_PIXELS = 89478485
with Image . open ( final_place ) as img :
img = ImageOps . exif_transpose ( img )
img . thumbnail ( ( 150 , 150 ) )
img . save ( final_place )
thumbnail_width = img . width
thumbnail_height = img . height
return File ( file_name = new_filename + file_extension , thumbnail_width = thumbnail_width ,
thumbnail_height = thumbnail_height , thumbnail_path = final_place ,
source_url = filename )
2024-02-04 01:02:32 -08:00
# By no means is this a complete list, but it is very easy to search for the ones you need later.
KNOWN_OPENGRAPH_TAGS = [
" og:site_name " ,
" og:title " ,
" og:locale " ,
" og:type " ,
" og:image " ,
" og:url " ,
" og:image:url " ,
" og:image:secure_url " ,
" og:image:type " ,
" og:image:width " ,
" og:image:height " ,
" og:image:alt " ,
]
def parse_page ( page_url , tags_to_search = KNOWN_OPENGRAPH_TAGS , fallback_tags = None ) :
'''
Parses a page , returns a JSON style dictionary of all OG tags found on that page .
Passing in tags_to_search is optional . By default it will search through KNOWN_OPENGRAPH_TAGS constant , but for the sake of efficiency , you may want to only search for 1 or 2 tags
Returns False if page is unreadable
'''
# read the html from the page
response = get_request ( page_url )
2024-02-04 19:23:42 -08:00
if response . status_code != 200 :
2024-02-04 01:02:32 -08:00
return False
# set up beautiful soup
soup = BeautifulSoup ( response . content , ' html.parser ' )
# loop through the known list of opengraph tags, searching for each and appending a dictionary as we go.
found_tags = { }
for og_tag in tags_to_search :
new_found_tag = soup . find ( " meta " , property = og_tag )
if new_found_tag is not None :
found_tags [ new_found_tag [ " property " ] ] = new_found_tag [ " content " ]
elif fallback_tags is not None and og_tag in fallback_tags :
found_tags [ og_tag ] = soup . find ( fallback_tags [ og_tag ] ) . text
return found_tags
2024-02-06 20:31:12 -08:00
def current_theme ( ) :
2024-02-06 21:33:25 -08:00
""" The theme the current user has set, falling back to the site default if none specified or user is not logged in """
2024-02-06 20:31:12 -08:00
if current_user . is_authenticated :
if current_user . theme is not None and current_user . theme != ' ' :
return current_user . theme
else :
2024-04-09 13:48:31 -07:00
if hasattr ( g , ' site ' ) :
site = g . site
else :
site = Site . query . get ( 1 )
return site . default_theme if site . default_theme is not None else ' '
2024-02-06 20:31:12 -08:00
else :
2024-06-21 21:55:57 -07:00
site = Site . query . get ( 1 )
return site . default_theme if site . default_theme is not None else ' '
2024-02-06 21:33:25 -08:00
def theme_list ( ) :
""" All the themes available, by looking in the templates/themes directory """
result = [ ( ' ' , ' PieFed ' ) ]
for root , dirs , files in os . walk ( ' app/templates/themes ' ) :
for dir in dirs :
if os . path . exists ( f ' app/templates/themes/ { dir } / { dir } .json ' ) :
theme_settings = json . loads ( file_get_contents ( f ' app/templates/themes/ { dir } / { dir } .json ' ) )
result . append ( ( dir , theme_settings [ ' name ' ] ) )
return result
2024-02-24 19:24:50 -08:00
def sha256_digest ( input_string ) :
"""
Compute the SHA - 256 hash digest of a given string .
Args :
- input_string : The string to compute the hash digest for .
Returns :
- A hexadecimal string representing the SHA - 256 hash digest .
"""
sha256_hash = hashlib . sha256 ( )
sha256_hash . update ( input_string . encode ( ' utf-8 ' ) )
return sha256_hash . hexdigest ( )
2024-03-07 17:33:58 -08:00
2024-03-08 01:01:46 -08:00
def remove_tracking_from_link ( url ) :
2024-03-08 01:09:54 -08:00
parsed_url = urlparse ( url )
if parsed_url . netloc == ' youtu.be ' :
# Extract video ID
video_id = parsed_url . path [ 1 : ] # Remove leading slash
# Preserve 't' parameter if it exists
query_params = parse_qs ( parsed_url . query )
if ' t ' in query_params :
new_query_params = { ' t ' : query_params [ ' t ' ] }
new_query_string = urlencode ( new_query_params , doseq = True )
else :
new_query_string = ' '
cleaned_url = f " https://youtu.be/ { video_id } "
if new_query_string :
cleaned_url + = f " ? { new_query_string } "
2024-03-07 17:33:58 -08:00
2024-03-08 01:09:54 -08:00
return cleaned_url
2024-03-07 17:33:58 -08:00
else :
return url
2024-04-03 00:48:39 -07:00
def show_ban_message ( ) :
flash ( ' You have been banned. ' , ' error ' )
logout_user ( )
resp = make_response ( redirect ( url_for ( ' main.index ' ) ) )
resp . set_cookie ( ' sesion ' , ' 17489047567495 ' , expires = datetime ( year = 2099 , month = 12 , day = 30 ) )
return resp
2024-04-10 19:04:57 -07:00
# search a sorted list using a binary search. Faster than using 'in' with a unsorted list.
def in_sorted_list ( arr , target ) :
index = bisect . bisect_left ( arr , target )
return index < len ( arr ) and arr [ index ] == target
2024-04-15 21:35:12 -07:00
# Makes a still image from a video url, without downloading the whole video file
def generate_image_from_video_url ( video_url , output_path , length = 2 ) :
2024-06-21 23:18:26 -07:00
response = requests . get ( video_url , stream = True , headers = { ' User-Agent ' : ' Mozilla/5.0 (X11; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0 ' } ) # Imgur requires a user agent
2024-04-15 21:35:12 -07:00
content_type = response . headers . get ( ' Content-Type ' )
if content_type :
if ' video/mp4 ' in content_type :
temp_file_extension = ' .mp4 '
elif ' video/webm ' in content_type :
temp_file_extension = ' .webm '
else :
raise ValueError ( " Unsupported video format " )
else :
raise ValueError ( " Content-Type not found in response headers " )
# Generate a random temporary file name
temp_file_name = gibberish ( 15 ) + temp_file_extension
temp_file_path = os . path . join ( tempfile . gettempdir ( ) , temp_file_name )
# Write the downloaded data to a temporary file
with open ( temp_file_path , ' wb ' ) as f :
for chunk in response . iter_content ( chunk_size = 4096 ) :
f . write ( chunk )
if os . path . getsize ( temp_file_path ) > = length * 1024 * 1024 :
break
# Generate thumbnail from the temporary file
2024-04-19 21:47:27 -07:00
try :
clip = VideoFileClip ( temp_file_path )
except Exception as e :
os . unlink ( temp_file_path )
raise e
2024-04-15 21:35:12 -07:00
thumbnail = clip . get_frame ( 0 )
clip . close ( )
# Save the image
thumbnail_image = Image . fromarray ( thumbnail )
thumbnail_image . save ( output_path )
os . remove ( temp_file_path )
2024-04-10 19:04:57 -07:00
@cache.memoize ( timeout = 600 )
def recently_upvoted_posts ( user_id ) - > List [ int ] :
post_ids = db . session . execute ( text ( ' SELECT post_id FROM " post_vote " WHERE user_id = :user_id AND effect > 0 ORDER BY id DESC LIMIT 1000 ' ) ,
{ ' user_id ' : user_id } ) . scalars ( )
return sorted ( post_ids ) # sorted so that in_sorted_list can be used
@cache.memoize ( timeout = 600 )
def recently_downvoted_posts ( user_id ) - > List [ int ] :
post_ids = db . session . execute ( text ( ' SELECT post_id FROM " post_vote " WHERE user_id = :user_id AND effect < 0 ORDER BY id DESC LIMIT 1000 ' ) ,
{ ' user_id ' : user_id } ) . scalars ( )
return sorted ( post_ids )
@cache.memoize ( timeout = 600 )
def recently_upvoted_post_replies ( user_id ) - > List [ int ] :
reply_ids = db . session . execute ( text ( ' SELECT post_reply_id FROM " post_reply_vote " WHERE user_id = :user_id AND effect > 0 ORDER BY id DESC LIMIT 1000 ' ) ,
{ ' user_id ' : user_id } ) . scalars ( )
return sorted ( reply_ids ) # sorted so that in_sorted_list can be used
@cache.memoize ( timeout = 600 )
def recently_downvoted_post_replies ( user_id ) - > List [ int ] :
reply_ids = db . session . execute ( text ( ' SELECT post_reply_id FROM " post_reply_vote " WHERE user_id = :user_id AND effect < 0 ORDER BY id DESC LIMIT 1000 ' ) ,
{ ' user_id ' : user_id } ) . scalars ( )
return sorted ( reply_ids )
2024-05-08 02:07:22 -07:00
def languages_for_form ( ) :
result = [ ]
2024-06-20 02:27:36 -07:00
used_languages = [ ]
if current_user . is_authenticated :
recently_used_post_languages = db . session . execute ( text ( """ SELECT language_id
FROM (
SELECT language_id , posted_at
FROM " post "
WHERE user_id = : user_id
UNION ALL
SELECT language_id , posted_at
FROM " post_reply "
WHERE user_id = : user_id
) AS subquery
GROUP BY language_id
ORDER BY MAX ( posted_at ) DESC
2024-06-21 01:21:30 -07:00
LIMIT 10 """ ),
2024-06-20 02:27:36 -07:00
{ ' user_id ' : current_user . id } ) . scalars ( )
for language in Language . query . filter ( Language . id . in_ ( recently_used_post_languages ) ) . all ( ) :
result . append ( ( language . id , language . name ) )
used_languages . append ( language . id )
2024-05-08 02:07:22 -07:00
for language in Language . query . order_by ( Language . name ) . all ( ) :
2024-06-20 02:27:36 -07:00
if language . code != ' und ' and language . id not in used_languages :
2024-05-08 02:07:22 -07:00
result . append ( ( language . id , language . name ) )
2024-06-20 02:27:36 -07:00
2024-05-08 02:07:22 -07:00
return result
2024-05-08 22:54:30 -07:00
def english_language_id ( ) :
english = Language . query . filter ( Language . code == ' en ' ) . first ( )
return english . id if english else None
2024-05-15 20:44:42 -07:00
def actor_contains_blocked_words ( actor ) :
actor = actor . lower ( ) . strip ( )
blocked_words = get_setting ( ' actor_blocked_words ' )
if blocked_words and blocked_words . strip ( ) != ' ' :
for blocked_word in blocked_words . split ( ' \n ' ) :
blocked_word = blocked_word . lower ( ) . strip ( )
if blocked_word in actor :
return True
return False