mirror of
https://codeberg.org/rimu/pyfedi
synced 2025-01-23 19:36:56 -08:00
track recent activities using redis instead of db
This commit is contained in:
parent
691236c7cf
commit
958c8f226f
5 changed files with 74 additions and 13 deletions
|
@ -116,7 +116,7 @@ pip install -r requirements.txt
|
|||
* `SERVER_NAME` should be the domain of the site/instance. Use `127.0.0.1:5000` during development unless using ngrok.
|
||||
* `RECAPTCHA_PUBLIC_KEY` and `RECAPTCHA_PRIVATE_KEY` can be generated at https://www.google.com/recaptcha/admin/create (this is optional - omit to allow registration without RECAPCHA).
|
||||
* `CACHE_TYPE` can be `FileSystemCache` or `RedisCache`. `FileSystemCache` is fine during development (set `CACHE_DIR` to `/tmp/piefed` or `/dev/shm/piefed`)
|
||||
while `RedisCache` **should** be used in production. If using `RedisCache`, set `CACHE_REDIS_URL` to `redis://localhost:6379/1`
|
||||
while `RedisCache` **should** be used in production. If using `RedisCache`, set `CACHE_REDIS_URL` to `redis://localhost:6379/1`. Visit https://yourdomain/testredis to check if your redis url is working.
|
||||
|
||||
* `CELERY_BROKER_URL` is similar to `CACHE_REDIS_URL` but with a different number on the end: `redis://localhost:6379/0`
|
||||
|
||||
|
|
|
@ -18,8 +18,6 @@ from sqlalchemy_searchable import make_searchable
|
|||
|
||||
from config import Config
|
||||
|
||||
import re
|
||||
|
||||
|
||||
def get_locale():
|
||||
try:
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
from flask_login import current_user
|
||||
|
||||
from app import db, constants, cache, celery
|
||||
|
@ -16,9 +18,9 @@ from app.models import User, Community, CommunityJoinRequest, CommunityMember, C
|
|||
from app.activitypub.util import public_key, users_total, active_half_year, active_month, local_posts, local_comments, \
|
||||
post_to_activity, find_actor_or_create, default_context, instance_blocked, find_reply_parent, find_liked_object, \
|
||||
lemmy_site_data, instance_weight, is_activitypub_request, downvote_post_reply, downvote_post, upvote_post_reply, \
|
||||
upvote_post, activity_already_ingested, delete_post_or_comment, community_members, \
|
||||
upvote_post, delete_post_or_comment, community_members, \
|
||||
user_removed_from_remote_server, create_post, create_post_reply, update_post_reply_from_activity, \
|
||||
update_post_from_activity, undo_vote, undo_downvote, post_to_page
|
||||
update_post_from_activity, undo_vote, undo_downvote, post_to_page, get_redis_connection
|
||||
from app.utils import gibberish, get_setting, is_image_url, allowlist_html, render_template, \
|
||||
domain_from_url, markdown_to_html, community_membership, ap_datetime, ip_address, can_downvote, \
|
||||
can_upvote, can_create_post, awaken_dormant_instance, shorten_string, can_create_post_reply, sha256_digest, \
|
||||
|
@ -26,6 +28,17 @@ from app.utils import gibberish, get_setting, is_image_url, allowlist_html, rend
|
|||
import werkzeug.exceptions
|
||||
|
||||
|
||||
@bp.route('/testredis')
|
||||
def testredis_get():
|
||||
redis_client = get_redis_connection()
|
||||
redis_client.set("cowbell", "1", ex=600)
|
||||
x = redis_client.get('cowbell')
|
||||
if x == '1':
|
||||
return "Redis: OK"
|
||||
else:
|
||||
return "Redis: FAIL"
|
||||
|
||||
|
||||
@bp.route('/.well-known/webfinger')
|
||||
def webfinger():
|
||||
if request.args.get('resource'):
|
||||
|
@ -350,13 +363,15 @@ def shared_inbox():
|
|||
return ''
|
||||
|
||||
if 'id' in request_json:
|
||||
if activity_already_ingested(request_json['id']): # Lemmy has an extremely short POST timeout and tends to retry unnecessarily. Ignore their retries.
|
||||
redis_client = get_redis_connection()
|
||||
if redis_client.get(request_json['id']) is not None: # Lemmy has an extremely short POST timeout and tends to retry unnecessarily. Ignore their retries.
|
||||
activity_log.result = 'ignored'
|
||||
activity_log.exception_message = 'Unnecessary retry attempt'
|
||||
db.session.add(activity_log)
|
||||
db.session.commit()
|
||||
return ''
|
||||
|
||||
redis_client.set(request_json['id'], 1, ex=90) # Save the activity ID into redis, to avoid duplicate activities that Lemmy sometimes sends
|
||||
activity_log.activity_id = request_json['id']
|
||||
if g.site.log_activitypub_json:
|
||||
activity_log.activity_json = json.dumps(request_json)
|
||||
|
|
|
@ -5,6 +5,8 @@ import os
|
|||
from datetime import timedelta
|
||||
from random import randint
|
||||
from typing import Union, Tuple
|
||||
|
||||
import redis
|
||||
from flask import current_app, request, g, url_for
|
||||
from flask_babel import _
|
||||
from sqlalchemy import text, func
|
||||
|
@ -17,7 +19,7 @@ import requests
|
|||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from app.constants import *
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from PIL import Image, ImageOps
|
||||
from io import BytesIO
|
||||
import pytesseract
|
||||
|
@ -1008,11 +1010,6 @@ def is_activitypub_request():
|
|||
return 'application/ld+json' in request.headers.get('Accept', '') or 'application/activity+json' in request.headers.get('Accept', '')
|
||||
|
||||
|
||||
def activity_already_ingested(ap_id):
|
||||
return db.session.execute(text('SELECT id FROM "activity_pub_log" WHERE activity_id = :activity_id'),
|
||||
{'activity_id': ap_id}).scalar()
|
||||
|
||||
|
||||
def downvote_post(post, user):
|
||||
user.last_seen = utcnow()
|
||||
user.recalculate_attitude()
|
||||
|
@ -1626,6 +1623,57 @@ def undo_vote(activity_log, comment, post, target_ap_id, user):
|
|||
return post
|
||||
|
||||
|
||||
def get_redis_connection() -> redis.Redis:
|
||||
connection_string = current_app.config['CACHE_REDIS_URL']
|
||||
if connection_string.startswith('unix://'):
|
||||
unix_socket_path, db, password = parse_redis_pipe_string(connection_string)
|
||||
return redis.Redis(unix_socket_path=unix_socket_path, db=db, password=password)
|
||||
else:
|
||||
host, port, db, password = parse_redis_socket_string(connection_string)
|
||||
return redis.Redis(host=host, port=port, db=db, password=password)
|
||||
|
||||
|
||||
def parse_redis_pipe_string(connection_string: str):
|
||||
if connection_string.startswith('unix://'):
|
||||
# Parse the connection string
|
||||
parsed_url = urlparse(connection_string)
|
||||
|
||||
# Extract the path (Unix socket path)
|
||||
unix_socket_path = parsed_url.path
|
||||
|
||||
# Extract query parameters (if any)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# Extract database number (default to 0 if not provided)
|
||||
db = int(query_params.get('db', [0])[0])
|
||||
|
||||
# Extract password (if provided)
|
||||
password = query_params.get('password', [None])[0]
|
||||
|
||||
return unix_socket_path, db, password
|
||||
|
||||
|
||||
def parse_redis_socket_string(connection_string: str):
|
||||
# Parse the connection string
|
||||
parsed_url = urlparse(connection_string)
|
||||
|
||||
# Extract username (if provided) and password
|
||||
if parsed_url.username:
|
||||
username = parsed_url.username
|
||||
else:
|
||||
username = None
|
||||
password = parsed_url.password
|
||||
|
||||
# Extract host and port
|
||||
host = parsed_url.hostname
|
||||
port = parsed_url.port
|
||||
|
||||
# Extract database number (default to 0 if not provided)
|
||||
db = int(parsed_url.path.lstrip('/') or 0)
|
||||
|
||||
return host, port, db, password
|
||||
|
||||
|
||||
def lemmy_site_data():
|
||||
site = g.site
|
||||
data = {
|
||||
|
|
|
@ -91,7 +91,7 @@ def show_topic(topic_path):
|
|||
per_page = 300
|
||||
posts = posts.paginate(page=page, per_page=per_page, error_out=False)
|
||||
|
||||
topic_communities = Community.query.filter(Community.topic_id == current_topic.id).order_by(Community.name)
|
||||
topic_communities = Community.query.filter(Community.topic_id == current_topic.id, Community.banned == False).order_by(Community.name)
|
||||
|
||||
next_url = url_for('topic.show_topic',
|
||||
topic_path=topic_path,
|
||||
|
|
Loading…
Reference in a new issue