pyfedi/app/activitypub/signature.py

563 lines
21 KiB
Python
Raw Normal View History

2023-08-22 21:24:11 +12:00
# code in this file is from Takahe https://github.com/jointakahe/takahe
#
# Copyright 2022 Andrew Godwin
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations
import base64
import json
from typing import Literal, TypedDict, cast
from urllib.parse import urlparse
import httpx
2023-08-22 21:24:11 +12:00
import arrow
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from flask import Request, current_app
2023-08-22 21:24:11 +12:00
from datetime import datetime
from dateutil import parser
from pyld import jsonld
from email.utils import formatdate
from app import db, celery, httpx_client
2023-08-22 21:24:11 +12:00
from app.constants import DATETIME_MS_FORMAT
from app.models import utcnow, ActivityPubLog, Community, Instance, CommunityMember, User
from sqlalchemy import text
2023-08-22 21:24:11 +12:00
def http_date(epoch_seconds=None):
if epoch_seconds is None:
epoch_seconds = arrow.utcnow().timestamp()
return formatdate(epoch_seconds, usegmt=True) # takahe uses formatdate so let's try that
2023-12-29 17:32:35 +13:00
#formatted_date = arrow.get(epoch_seconds).format('ddd, DD MMM YYYY HH:mm:ss ZZ', 'en_US') # mastodon does not like this
#return formatted_date
2023-08-22 21:24:11 +12:00
def format_ld_date(value: datetime) -> str:
# We chop the timestamp to be identical to the timestamps returned by
# Mastodon's API, because some clients like Toot! (for iOS) are especially
# picky about timestamp parsing.
return f"{value.strftime(DATETIME_MS_FORMAT)[:-4]}Z"
def parse_http_date(http_date_str):
parsed_date = arrow.get(http_date_str, 'ddd, DD MMM YYYY HH:mm:ss Z')
return parsed_date.datetime
def parse_ld_date(value: str | None) -> datetime | None:
if value is None:
return None
return parser.isoparse(value).replace(microsecond=0)
2023-12-22 15:34:45 +13:00
def post_request_in_background(uri: str, body: dict | None, private_key: str, key_id: str, content_type: str = "application/activity+json",
method: Literal["get", "post"] = "post", timeout: int = 5,):
if current_app.debug:
return post_request(uri=uri, body=body, private_key=private_key, key_id=key_id, content_type=content_type, method=method, timeout=timeout)
else:
post_request.delay(uri=uri, body=body, private_key=private_key, key_id=key_id, content_type=content_type, method=method, timeout=timeout)
return True
@celery.task
2023-12-22 15:34:45 +13:00
def post_request(uri: str, body: dict | None, private_key: str, key_id: str, content_type: str = "application/activity+json",
method: Literal["get", "post"] = "post", timeout: int = 5,):
if '@context' not in body: # add a default json-ld context if necessary
body['@context'] = default_context()
type = body['type'] if 'type' in body else ''
log = ActivityPubLog(direction='out', activity_type=type, result='processing', activity_id=body['id'], exception_message='')
2024-10-02 20:12:32 +13:00
log.activity_json = json.dumps(body)
db.session.add(log)
db.session.commit()
2024-05-22 06:29:28 +12:00
if uri is None or uri == '':
2023-12-22 15:34:45 +13:00
log.result = 'failure'
2024-05-22 06:29:28 +12:00
log.exception_message = 'empty uri'
else:
try:
result = HttpSignature.signed_request(uri, body, private_key, key_id, content_type, method, timeout)
if result.status_code != 200 and result.status_code != 202 and result.status_code != 204:
2024-05-22 06:29:28 +12:00
log.result = 'failure'
log.exception_message = f'{result.status_code}: {result.text:.100}' + ' - '
if 'DOCTYPE html' in result.text:
log.result = 'ignored'
2024-10-02 20:12:32 +13:00
log.exception_message = f'{result.status_code}: HTML instead of JSON response - '
log.activity_json += result.text
elif 'community_has_no_followers' in result.text:
fix_local_community_membership(uri, private_key)
else:
current_app.logger.error(f'Response code for post attempt to {uri} was ' +
str(result.status_code) + ' ' + result.text)
2024-05-22 06:29:28 +12:00
log.exception_message += uri
if result.status_code == 202:
log.exception_message += ' 202'
if result.status_code == 204:
log.exception_message += ' 204'
2024-05-22 06:29:28 +12:00
except Exception as e:
log.result = 'failure'
log.exception_message='could not send:' + str(e)
current_app.logger.error(f'Exception while sending post to {uri}')
if log.result == 'processing':
log.result = 'success'
2023-12-22 15:34:45 +13:00
db.session.commit()
if log.result != 'failure':
return True
else:
return log.exception_message
2023-12-22 15:34:45 +13:00
def signed_get_request(uri: str, private_key: str, key_id: str, content_type: str = "application/activity+json",
method: Literal["get", "post"] = "get", timeout: int = 5,):
try:
result = HttpSignature.signed_request(uri, None, private_key, key_id, content_type, method, timeout)
except Exception as e:
current_app.logger.error(f'Exception while sending post to {uri}')
return result
2023-08-22 21:24:11 +12:00
class VerificationError(BaseException):
"""
There was an error with verifying the signature
"""
pass
class VerificationFormatError(VerificationError):
"""
There was an error with the format of the signature (not if it is valid)
"""
pass
class RsaKeys:
@classmethod
def generate_keypair(cls) -> tuple[str, str]:
"""
Generates a new RSA keypair
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_key_serialized = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("ascii")
public_key_serialized = (
private_key.public_key()
.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("ascii")
)
return private_key_serialized, public_key_serialized
2024-05-28 15:15:53 +12:00
# Get a piece of the signature string. Similar to parse_signature except unencumbered by needing to return a HttpSignatureDetails
def signature_part(signature, key):
parts = signature.split(',')
for part in parts:
part_parts = part.split('=')
part_parts[0] = part_parts[0].strip()
if part_parts[0] == key:
return part_parts[1].strip().replace('"', '')
return ''
2023-08-22 21:24:11 +12:00
class HttpSignature:
"""
Allows for calculation and verification of HTTP signatures
"""
@classmethod
def calculate_digest(cls, data, algorithm="sha-256") -> str:
"""
Calculates the digest header value for a given HTTP body
"""
if algorithm == "sha-256":
digest = hashes.Hash(hashes.SHA256())
digest.update(data)
return "SHA-256=" + base64.b64encode(digest.finalize()).decode("ascii")
else:
raise ValueError(f"Unknown digest algorithm {algorithm}")
@classmethod
def headers_from_request(cls, request: Request, header_names: list[str]) -> str:
"""
Creates the to-be-signed header payload from a Flask request
"""
headers = {}
for header_name in header_names:
if header_name == "(request-target)":
value = f"{request.method.lower()} {request.path}"
elif header_name == '(created)':
value = signature_part(request.headers.get('Signature'), 'created') # Don't use parse_signature because changing HttpSignatureDetails changes everything & I don't have the spoons for that ATM.
elif header_name == '(expires)':
value = signature_part(request.headers.get('Signature'), 'expires')
2023-08-22 21:24:11 +12:00
elif header_name == "content-type":
value = request.headers.get("Content-Type", "")
elif header_name == "content-length":
value = request.headers.get("Content-Length", "")
else:
value = request.headers.get(header_name.replace("-", "_").upper(), "")
headers[header_name] = value
return "\n".join(f"{name.lower()}: {value}" for name, value in headers.items())
@classmethod
def parse_signature(cls, signature: str) -> "HttpSignatureDetails":
bits = {}
for item in signature.split(","):
name, value = item.split("=", 1)
value = value.strip('"')
bits[name.lower()] = value
try:
signature_details: HttpSignatureDetails = {
"headers": bits["headers"].split(),
"signature": base64.b64decode(bits["signature"]),
"algorithm": bits["algorithm"],
"keyid": bits["keyid"],
}
except KeyError as e:
key_names = " ".join(bits.keys())
raise VerificationError(
f"Missing item from details (have: {key_names}, error: {e})"
)
return signature_details
@classmethod
def compile_signature(cls, details: "HttpSignatureDetails") -> str:
value = f'keyId="{details["keyid"]}",headers="'
value += " ".join(h.lower() for h in details["headers"])
value += '",signature="'
value += base64.b64encode(details["signature"]).decode("ascii")
value += f'",algorithm="{details["algorithm"]}"'
return value
@classmethod
def verify_signature(
cls,
signature: bytes,
cleartext: str,
public_key: str,
):
public_key_instance: rsa.RSAPublicKey = cast(
rsa.RSAPublicKey,
serialization.load_pem_public_key(public_key.encode("ascii")),
)
try:
public_key_instance.verify(
signature,
cleartext.encode("ascii"),
padding.PKCS1v15(),
hashes.SHA256(),
)
except InvalidSignature:
raise VerificationError("Signature mismatch")
@classmethod
def verify_request(cls, request: Request, public_key, skip_date=False):
"""
Verifies that the request has a valid signature for its body
"""
# Verify body digest
if "digest" in request.headers:
expected_digest = HttpSignature.calculate_digest(request.data)
if request.headers["digest"] != expected_digest:
raise VerificationFormatError("Digest is incorrect")
# Verify date header
if "date" in request.headers and not skip_date:
header_date = parse_http_date(request.headers["date"])
if abs((arrow.utcnow() - header_date).total_seconds()) > 3600:
raise VerificationFormatError("Date is too far away")
# Get the signature details
if "signature" not in request.headers:
raise VerificationFormatError("No signature header present")
signature_details = cls.parse_signature(request.headers["signature"])
# Reject unknown algorithms
# hs2019 is used by some libraries to obfuscate the real algorithm per the spec
# https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-12
if (
signature_details["algorithm"] != "rsa-sha256"
and signature_details["algorithm"] != "hs2019"
):
raise VerificationFormatError("Unknown signature algorithm")
# Create the signature payload
headers_string = cls.headers_from_request(request, signature_details["headers"])
cls.verify_signature(
signature_details["signature"],
headers_string,
public_key,
)
return True
2023-08-22 21:24:11 +12:00
@classmethod
def signed_request(
cls,
uri: str,
body: dict | None,
private_key: str,
key_id: str,
content_type: str = "application/activity+json",
2023-08-22 21:24:11 +12:00
method: Literal["get", "post"] = "post",
timeout: int = 5,
2023-08-22 21:24:11 +12:00
):
"""
Performs a request to the given path, with a document, signed
2023-08-22 21:24:11 +12:00
as an identity.
"""
if "://" not in uri:
raise ValueError("URI does not contain a scheme")
# Create the core header field set
uri_parts = urlparse(uri)
date_string = http_date()
headers = {
"(request-target)": f"{method} {uri_parts.path}",
"Host": uri_parts.hostname,
"Date": date_string,
}
# If we have a body, add a digest and content type
if body is not None:
if '@context' not in body: # add a default json-ld context if necessary
body['@context'] = default_context()
2023-08-22 21:24:11 +12:00
body_bytes = json.dumps(body).encode("utf8")
headers["Digest"] = cls.calculate_digest(body_bytes)
headers["Content-Type"] = content_type
else:
body_bytes = b""
# GET requests get implicit accept headers added
if method == "get":
headers["Accept"] = "application/ld+json"
# Sign the headers
signed_string = "\n".join(
f"{name.lower()}: {value}" for name, value in headers.items()
)
private_key_instance: rsa.RSAPrivateKey = cast(
rsa.RSAPrivateKey,
serialization.load_pem_private_key(
private_key.encode("ascii"),
password=None,
),
)
signature = private_key_instance.sign(
signed_string.encode("ascii"),
padding.PKCS1v15(),
hashes.SHA256(),
)
headers["Signature"] = cls.compile_signature(
{
"keyid": key_id,
"headers": list(headers.keys()),
"signature": signature,
"algorithm": "rsa-sha256",
}
)
headers["User-Agent"] = 'PieFed/1.0'
2023-08-22 21:24:11 +12:00
# Send the request with all those headers except the pseudo one
del headers["(request-target)"]
try:
response = httpx_client.request(
2023-08-22 21:24:11 +12:00
method,
uri,
headers=headers,
data=body_bytes,
timeout=timeout,
follow_redirects=method == "GET",
2023-08-22 21:24:11 +12:00
)
except httpx.HTTPError as ex:
2023-08-22 21:24:11 +12:00
# Convert to a more generic error we handle
raise httpx.HTTPError(f"HTTP Exception for {ex.request.url} - {ex}") from None
2023-08-22 21:24:11 +12:00
if (
method == "POST"
and 400 <= response.status_code < 500
and response.status_code != 404
):
raise ValueError(
f"POST error to {uri}: {response.status_code} {response.content!r}"
)
return response
class HttpSignatureDetails(TypedDict):
algorithm: str
headers: list[str]
signature: bytes
keyid: str
class LDSignature:
"""
Creates and verifies signatures of JSON-LD documents
"""
@classmethod
def verify_signature(cls, document: dict, public_key: str) -> None:
"""
Verifies a document
"""
try:
# Strip out the signature from the incoming document
signature = document.pop("signature")
# Create the options document
options = {
2024-08-27 19:37:47 +12:00
"@context": "https://w3id.org/security/v1",
2023-08-22 21:24:11 +12:00
"creator": signature["creator"],
"created": signature["created"],
}
except KeyError:
raise VerificationFormatError("Invalid signature section")
if signature["type"].lower() != "rsasignature2017":
raise VerificationFormatError("Unknown signature type")
# Get the normalised hash of each document
final_hash = cls.normalized_hash(options) + cls.normalized_hash(document)
# Verify the signature
public_key_instance: rsa.RSAPublicKey = cast(
rsa.RSAPublicKey,
serialization.load_pem_public_key(public_key.encode("ascii")),
)
try:
public_key_instance.verify(
base64.b64decode(signature["signatureValue"]),
final_hash,
padding.PKCS1v15(),
hashes.SHA256(),
)
except InvalidSignature:
raise VerificationError("Signature mismatch")
@classmethod
def create_signature(
cls, document: dict, private_key: str, key_id: str
) -> dict[str, str]:
"""
Creates the signature for a document
"""
# Create the options document
options: dict[str, str] = {
2024-08-27 19:37:47 +12:00
"@context": "https://w3id.org/security/v1",
2023-08-22 21:24:11 +12:00
"creator": key_id,
"created": format_ld_date(utcnow()),
2023-08-22 21:24:11 +12:00
}
# Get the normalised hash of each document
final_hash = cls.normalized_hash(options) + cls.normalized_hash(document)
# Create the signature
private_key_instance: rsa.RSAPrivateKey = cast(
rsa.RSAPrivateKey,
serialization.load_pem_private_key(
private_key.encode("ascii"),
password=None,
),
)
signature = base64.b64encode(
private_key_instance.sign(
final_hash,
padding.PKCS1v15(),
hashes.SHA256(),
)
)
# Add it to the options document along with other bits
options["signatureValue"] = signature.decode("ascii")
options["type"] = "RsaSignature2017"
return options
@classmethod
def normalized_hash(cls, document) -> bytes:
"""
Takes a JSON-LD document and create a hash of its URDNA2015 form,
in the same way that Mastodon does internally.
Reference: https://socialhub.activitypub.rocks/t/making-sense-of-rsasignature2017/347
"""
norm_form = jsonld.normalize(
document,
{"algorithm": "URDNA2015", "format": "application/n-quads"},
)
digest = hashes.Hash(hashes.SHA256())
digest.update(norm_form.encode("utf8"))
return digest.finalize().hex().encode("ascii")
def default_context():
context = [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
]
if current_app.config['FULL_AP_CONTEXT']:
context.append({
"lemmy": "https://join-lemmy.org/ns#",
"litepub": "http://litepub.social/ns#",
"pt": "https://joinpeertube.org/ns#",
"sc": "http://schema.org/",
"ChatMessage": "litepub:ChatMessage",
"commentsEnabled": "pt:commentsEnabled",
"sensitive": "as:sensitive",
"matrixUserId": "lemmy:matrixUserId",
"postingRestrictedToMods": "lemmy:postingRestrictedToMods",
"removeData": "lemmy:removeData",
"stickied": "lemmy:stickied",
"moderators": {
"@type": "@id",
"@id": "lemmy:moderators"
},
"expires": "as:endTime",
"distinguished": "lemmy:distinguished",
"language": "sc:inLanguage",
"identifier": "sc:identifier"
})
return context
def fix_local_community_membership(uri: str, private_key: str):
community = Community.query.filter_by(private_key=private_key).first()
parsed_url = urlparse(uri)
instance_domain = parsed_url.netloc
instance = Instance.query.filter_by(domain=instance_domain).first()
if community and instance:
followers = CommunityMember.query.filter_by(community_id=community.id). \
join(User, User.id == CommunityMember.user_id). \
filter(User.instance_id == instance.id)
for f in followers:
db.session.execute(text('DELETE FROM "community_member" WHERE user_id = :user_id AND community_id = :community_id'),
{'user_id': f.user_id, 'community_id': community.id})