mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-23 19:36:56 -08:00
136 lines
5.6 KiB
Python
136 lines
5.6 KiB
Python
import os
|
|
from flask import current_app
|
|
from sqlalchemy import text
|
|
from app import db
|
|
from app.models import User, Post, Community
|
|
import time
|
|
import base64
|
|
import requests
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from app.constants import *
|
|
|
|
|
|
def public_key():
|
|
if not os.path.exists('./public.pem'):
|
|
os.system('openssl genrsa -out private.pem 2048')
|
|
os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem')
|
|
else:
|
|
publicKey = open('./public.pem', 'r').read()
|
|
PUBLICKEY = publicKey.replace('\n', '\\n') # JSON-LD doesn't want to work with linebreaks,
|
|
# but needs the \n character to know where to break the line ;)
|
|
return PUBLICKEY
|
|
|
|
|
|
def users_total():
|
|
return db.session.execute(text(
|
|
'SELECT COUNT(id) as c FROM "user" WHERE ap_id is null AND verified is true AND banned is false AND deleted is false')).scalar()
|
|
|
|
|
|
def active_half_year():
|
|
return db.session.execute(text(
|
|
"SELECT COUNT(id) as c FROM \"user\" WHERE last_seen >= CURRENT_DATE - INTERVAL '6 months' AND ap_id is null AND verified is true AND banned is false AND deleted is false")).scalar()
|
|
|
|
|
|
def active_month():
|
|
return db.session.execute(text(
|
|
"SELECT COUNT(id) as c FROM \"user\" WHERE last_seen >= CURRENT_DATE - INTERVAL '1 month' AND ap_id is null AND verified is true AND banned is false AND deleted is false")).scalar()
|
|
|
|
|
|
def local_posts():
|
|
return db.session.execute(text('SELECT COUNT(id) as c FROM "post" WHERE ap_id is null')).scalar()
|
|
|
|
|
|
def local_comments():
|
|
return db.session.execute(text('SELECT COUNT(id) as c FROM "post_reply" WHERE ap_id is null')).scalar()
|
|
|
|
|
|
def send_activity(sender: User, host: str, content: str):
|
|
|
|
date = time.strftime('%a, %d %b %Y %H:%M:%S UTC', time.gmtime())
|
|
|
|
private_key = serialization.load_pem_private_key(sender.private_key, password=None)
|
|
|
|
# todo: look up instance details to set host_inbox
|
|
host_inbox = '/inbox'
|
|
|
|
signed_string = f"(request-target): post {host_inbox}\nhost: {host}\ndate: " + date
|
|
signature = private_key.sign(signed_string.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256())
|
|
encoded_signature = base64.b64encode(signature).decode('utf-8')
|
|
|
|
# Construct the Signature header
|
|
header = f'keyId="https://{current_app.config["SERVER_NAME"]}/u/{sender.user_name}",headers="(request-target) host date",signature="{encoded_signature}"'
|
|
|
|
# Create headers for the request
|
|
headers = {
|
|
'Host': host,
|
|
'Date': date,
|
|
'Signature': header
|
|
}
|
|
|
|
# Make the HTTP request
|
|
try:
|
|
response = requests.post(f'https://{host}{host_inbox}', headers=headers, data=content,
|
|
timeout=REQUEST_TIMEOUT)
|
|
except requests.exceptions.RequestException:
|
|
time.sleep(1)
|
|
response = requests.post(f'https://{host}{host_inbox}', headers=headers, data=content,
|
|
timeout=REQUEST_TIMEOUT / 2)
|
|
return response.status_code
|
|
|
|
|
|
def post_to_activity(post: Post, community: Community):
|
|
activity_data = {
|
|
"actor": f"https://{current_app.config['SERVER_NAME']}/c/{community.name}",
|
|
"to": [
|
|
"https://www.w3.org/ns/activitystreams#Public"
|
|
],
|
|
"object": {
|
|
"id": f"https://{current_app.config['SERVER_NAME']}/activities/create/{post.ap_create_id}",
|
|
"actor": f"https://{current_app.config['SERVER_NAME']}/u/{post.author.user_name}",
|
|
"to": [
|
|
"https://www.w3.org/ns/activitystreams#Public"
|
|
],
|
|
"object": {
|
|
"type": "Page",
|
|
"id": f"https://{current_app.config['SERVER_NAME']}/post/{post.id}",
|
|
"attributedTo": f"https://{current_app.config['SERVER_NAME']}/u/{post.author.user_name}",
|
|
"to": [
|
|
f"https://{current_app.config['SERVER_NAME']}/c/{community.name}",
|
|
"https://www.w3.org/ns/activitystreams#Public"
|
|
],
|
|
"name": post.title,
|
|
"cc": [],
|
|
"content": post.body_html,
|
|
"mediaType": "text/html",
|
|
"source": {
|
|
"content": post.body,
|
|
"mediaType": "text/markdown"
|
|
},
|
|
"attachment": [],
|
|
"commentsEnabled": True,
|
|
"sensitive": post.nsfw or post.nsfl,
|
|
"published": post.created_at,
|
|
"audience": f"https://{current_app.config['SERVER_NAME']}/c/{community.name}"
|
|
},
|
|
"cc": [
|
|
f"https://{current_app.config['SERVER_NAME']}/c/{community.name}"
|
|
],
|
|
"type": "Create",
|
|
"audience": f"https://{current_app.config['SERVER_NAME']}/c/{community.name}"
|
|
},
|
|
"cc": [
|
|
f"https://{current_app.config['SERVER_NAME']}/c/{community.name}/followers"
|
|
],
|
|
"type": "Announce",
|
|
"id": f"https://{current_app.config['SERVER_NAME']}/activities/announce/{post.ap_announce_id}"
|
|
}
|
|
if post.edited_at is not None:
|
|
activity_data["object"]["object"]["updated"] = post.edited_at
|
|
if post.language is not None:
|
|
activity_data["object"]["object"]["language"] = {"identifier": post.language}
|
|
if post.type == POST_TYPE_LINK and post.url is not None:
|
|
activity_data["object"]["object"]["attachment"] = {"href": post.url, "type": "Link"}
|
|
if post.image_id is not None:
|
|
activity_data["object"]["object"]["image"] = {"href": post.image.source_url, "type": "Image"}
|
|
return activity_data
|