Skip to content
Snippets Groups Projects
Commit d6cd1659 authored by supersonicwisd1's avatar supersonicwisd1
Browse files

webfinger and example update

Signed-off-by: supersonicwisd1 <supersonicwisd1>
parent 92328511
No related branches found
No related tags found
1 merge request!9Update on the Webfinger and Webfinger test and Signing the keys in note example
""" """
Federation examples showing common ActivityPub interactions. Federation example showing handlers and signatures.
""" """
import asyncio import asyncio
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
import sys
# Add src directory to Python path from pyfed.models import APCreate, APNote, APPerson, APLike
src_path = Path(__file__).parent.parent / "src" from pyfed.security import KeyManager
sys.path.insert(0, str(src_path)) from pyfed.security.http_signatures import HTTPSignatureVerifier
from pyfed.storage import StorageBackend
from pyfed.security.key_management import KeyManager from pyfed.federation import ActivityDelivery
from pyfed.federation.delivery import ActivityDelivery from pyfed.handlers import CreateHandler, LikeHandler
from pyfed.federation.discovery import InstanceDiscovery
from pyfed.protocols.webfinger import WebFingerClient from pyfed.protocols.webfinger import WebFingerClient
from pyfed.models import APCreate, APNote, APPerson, APFollow, APLike, APAnnounce
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FederationExample: async def federation_example():
"""Federation interaction examples.""" """Example of federation with handlers and signatures."""
try:
def __init__(self, domain: str = "example.com"): # Initialize components
self.domain = domain storage = StorageBackend.create(
self.key_manager = None provider="sqlite",
self.delivery = None database_url="example_data/pyfed_example.db"
self.webfinger = None
self.discovery = None
async def initialize(self):
"""Initialize components."""
# Initialize key manager
self.key_manager = KeyManager(
domain=self.domain,
keys_path=str(Path("example_keys").resolve())
) )
logger.info("Initializing key manager...") await storage.initialize()
await self.key_manager.initialize()
# Initialize delivery # Initialize key manager and signature verifier
self.delivery = ActivityDelivery( key_manager = KeyManager(
key_manager=self.key_manager, domain="example.com",
timeout=30 private_key_path="example_keys/private.pem",
public_key_path="example_keys/public.pem"
) )
logger.info("Initializing delivery...") await key_manager.initialize()
await self.delivery.initialize()
# Initialize WebFinger client with SSL verification disabled for testing signature_verifier = HTTPSignatureVerifier(key_manager)
self.webfinger = WebFingerClient(verify_ssl=False)
logger.info("Initializing WebFinger client...")
await self.webfinger.initialize()
async def send_public_post(self, content: str): # Initialize handlers
"""Send a public post to the Fediverse.""" create_handler = CreateHandler(storage)
logger.info(f"Sending public post: {content}") like_handler = LikeHandler(storage)
# Initialize delivery with signature support
delivery = ActivityDelivery(
key_manager=key_manager,
signature_verifier=signature_verifier
)
# Create local actor # Create and send a note
actor = APPerson( actor = APPerson(
id=f"https://{self.domain}/users/alice", id="https://example.com/users/alice",
name="Alice", name="Alice",
preferred_username="alice", preferred_username="alice",
inbox=f"https://{self.domain}/users/alice/inbox", inbox="https://example.com/users/alice/inbox",
outbox=f"https://{self.domain}/users/alice/outbox" outbox="https://example.com/users/alice/outbox"
) )
# Create note
note = APNote( note = APNote(
id=f"https://{self.domain}/notes/{datetime.utcnow().timestamp()}", id=f"https://example.com/notes/{datetime.now(timezone.utc).timestamp()}",
content=content, content="Hello Fediverse!",
attributed_to=str(actor.id), attributed_to=str(actor.id),
to=["https://www.w3.org/ns/activitystreams#Public"], to=["https://www.w3.org/ns/activitystreams#Public"]
published=datetime.utcnow().isoformat()
) )
# Create activity # Create activity
create_activity = APCreate( create_activity = APCreate(
id=f"https://{self.domain}/activities/{datetime.utcnow().timestamp()}", id=f"https://example.com/activities/{datetime.now(timezone.utc).timestamp()}",
actor=str(actor.id), actor=str(actor.id),
object=note, object=note,
to=note.to, to=note.to
published=datetime.utcnow().isoformat()
)
# Deliver to followers (example)
activity_dict = create_activity.serialize()
result = await self.delivery.deliver_activity(
activity=activity_dict,
recipients=[f"https://{self.domain}/followers"]
) )
logger.info(f"Delivery result: {result}")
async def send_direct_message(self, recipient: str, content: str):
"""Send a direct message to a specific user."""
logger.info(f"Sending direct message to {recipient}")
# Resolve recipient's inbox # Process outgoing activity through handler
inbox_url = await self.webfinger.get_inbox_url(recipient) await create_handler.handle_outgoing(create_activity.serialize())
if not inbox_url:
logger.error(f"Could not find inbox for {recipient}")
return
# Create note # Sign and deliver activity
note = APNote(
id=f"https://{self.domain}/notes/{datetime.utcnow().timestamp()}",
content=content,
attributed_to=f"https://{self.domain}/users/alice",
to=[inbox_url],
published=datetime.utcnow().isoformat()
)
# Create activity
create_activity = APCreate(
id=f"https://{self.domain}/activities/{datetime.utcnow().timestamp()}",
actor=f"https://{self.domain}/users/alice",
object=note,
to=note.to,
published=datetime.utcnow().isoformat()
)
# Deliver direct message
activity_dict = create_activity.serialize() activity_dict = create_activity.serialize()
result = await self.delivery.deliver_activity( signed_headers = await signature_verifier.sign_request(
activity=activity_dict, method="POST",
recipients=[inbox_url] path="/inbox",
body=activity_dict
) )
logger.info(f"Delivery result: {result}")
async def follow_account(self, account: str):
"""Follow a remote account."""
logger.info(f"Following account: {account}")
# Resolve account
actor_url = await self.webfinger.get_actor_url(account)
if not actor_url:
logger.error(f"Could not resolve account {account}")
return
# Create Follow activity
follow = APFollow(
id=f"https://{self.domain}/activities/follow_{datetime.utcnow().timestamp()}",
actor=f"https://{self.domain}/users/alice",
object=actor_url,
published=datetime.utcnow().isoformat()
)
# Get target inbox
inbox_url = await self.webfinger.get_inbox_url(account)
if not inbox_url:
logger.error(f"Could not find inbox for {account}")
return
# Deliver Follow activity # Deliver with signed headers
activity_dict = follow.serialize() result = await delivery.deliver_activity(
result = await self.delivery.deliver_activity(
activity=activity_dict, activity=activity_dict,
recipients=[inbox_url] recipients=["https://remote.example/inbox"],
) headers=signed_headers
logger.info(f"Follow result: {result}")
async def close(self):
"""Clean up resources."""
if self.delivery and hasattr(self.delivery, 'close'):
await self.delivery.close()
if self.webfinger and hasattr(self.webfinger, 'close'):
await self.webfinger.close()
async def main():
"""Run federation examples."""
federation = FederationExample()
logger.info("Initializing federation...")
await federation.initialize()
try:
# Example 1: Send public post
logger.info("Sending public post...")
await federation.send_public_post(
"Hello #Fediverse! This is a test post from PyFed!"
)
# Example 2: Send direct message
logger.info("Sending direct message...")
await federation.send_direct_message(
"kene29@mastodon.social",
"Hello! This is a direct message test from PyFed."
) )
# Example 3: Follow account # Simulate receiving an activity
logger.info("Following account...") incoming_headers = {
await federation.follow_account("kene29@mastodon.social") "Signature": "...", # Signature from remote server
"Date": "...",
"Host": "example.com"
}
incoming_activity = {
"type": "Like",
"actor": "https://remote.example/users/bob",
"object": note.id
}
# Verify signature of incoming activity
if await signature_verifier.verify_request(incoming_headers):
# Process through handler
await like_handler.handle_incoming(incoming_activity)
else:
logger.error("Invalid signature on incoming activity")
finally: finally:
await federation.close() await storage.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(federation_example())
\ No newline at end of file \ No newline at end of file
...@@ -9,6 +9,7 @@ import os ...@@ -9,6 +9,7 @@ import os
import sys import sys
from pathlib import Path from pathlib import Path
import ssl import ssl
from urllib.parse import urlparse
# Add src directory to Python path # Add src directory to Python path
src_path = Path(__file__).parent.parent / "src" src_path = Path(__file__).parent.parent / "src"
...@@ -19,6 +20,7 @@ from pyfed.security import KeyManager ...@@ -19,6 +20,7 @@ from pyfed.security import KeyManager
from pyfed.storage import StorageBackend from pyfed.storage import StorageBackend
from pyfed.federation import ActivityDelivery from pyfed.federation import ActivityDelivery
from pyfed.protocols.webfinger import WebFingerClient from pyfed.protocols.webfinger import WebFingerClient
from pyfed.security.http_signatures import HTTPSignatureVerifier
from pyfed.serializers.json_serializer import to_json, from_json from pyfed.serializers.json_serializer import to_json, from_json
# Configure logging # Configure logging
...@@ -58,6 +60,16 @@ async def create_note_example(): ...@@ -58,6 +60,16 @@ async def create_note_example():
) )
await key_manager.initialize() await key_manager.initialize()
# Get the active key for signing
active_key = await key_manager.get_active_key()
logger.info(f"Active key: {active_key}")
# Initialize HTTP signature verifier with the active key paths
signature_verifier = HTTPSignatureVerifier(
private_key_path=f"{key_manager.keys_path}/{active_key.key_id}_private.pem",
public_key_path=f"{key_manager.keys_path}/{active_key.key_id}_public.pem"
)
# Initialize delivery # Initialize delivery
delivery = ActivityDelivery( delivery = ActivityDelivery(
key_manager=key_manager, key_manager=key_manager,
...@@ -112,13 +124,25 @@ async def create_note_example(): ...@@ -112,13 +124,25 @@ async def create_note_example():
# Serialize and deliver # Serialize and deliver
logger.info("Serializing activity...") logger.info("Serializing activity...")
activity_dict = create_activity.serialize() activity_dict = create_activity.serialize()
logger.info(f"Serialized activity: {activity_dict}") # logger.info(f"Serialized activity: {activity_dict}")
logger.info(f"Activity: {to_json(create_activity, indent=2)}") # logger.info(f"Activity: {to_json(create_activity, indent=2)}")
# Sign the request
logger.debug(f"Resolved host: {urlparse(inbox_url).netloc}")
signed_headers = await signature_verifier.sign_request(
method="POST",
path="/inbox",
headers={"Host": urlparse(inbox_url).netloc},
body=activity_dict
)
logger.debug(f"Headers before signing: {signed_headers}")
logger.info("Delivering activity...") logger.info("Delivering activity...")
result = await delivery.deliver_activity( result = await delivery.deliver_activity(
activity=activity_dict, activity=activity_dict,
recipients=[inbox_url] recipients=[inbox_url],
headers=signed_headers # Include signed headers
) )
logger.info(f"Delivery result: {result}") logger.info(f"Delivery result: {result}")
......
File added
...@@ -10,7 +10,7 @@ from urllib.parse import urlparse ...@@ -10,7 +10,7 @@ from urllib.parse import urlparse
from ..utils.exceptions import ResolverError from ..utils.exceptions import ResolverError
from ..utils.logging import get_logger from ..utils.logging import get_logger
from ..cache.actor_cache import ActorCache from ..cache.actor_cache import ActorCache
from ..security.webfinger import WebFingerService from ..protocols.webfinger import WebFingerClient
logger = get_logger(__name__) logger = get_logger(__name__)
...@@ -19,7 +19,7 @@ class ActivityPubResolver: ...@@ -19,7 +19,7 @@ class ActivityPubResolver:
def __init__(self, def __init__(self,
actor_cache: Optional[ActorCache] = None, actor_cache: Optional[ActorCache] = None,
discovery_service: Optional[WebFingerService] = None): discovery_service: Optional[WebFingerClient] = None):
""" """
Initialize resolver. Initialize resolver.
......
...@@ -5,35 +5,23 @@ WebFinger protocol implementation. ...@@ -5,35 +5,23 @@ WebFinger protocol implementation.
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import aiohttp import aiohttp
from urllib.parse import quote from urllib.parse import quote
import json import logging
from ..utils.exceptions import WebFingerError logger = logging.getLogger(__name__)
from ..utils.logging import get_logger
logger = get_logger(__name__)
class WebFingerClient: class WebFingerClient:
"""WebFinger client implementation.""" """WebFinger client implementation."""
def __init__(self, timeout: int = 30, verify_ssl: bool = True): def __init__(self, verify_ssl: bool = True):
self.timeout = timeout
self.verify_ssl = verify_ssl self.verify_ssl = verify_ssl
self.session = None self.session = None
async def initialize(self) -> None: async def initialize(self) -> None:
"""Initialize client.""" """Initialize client."""
# Create SSL context ssl = None if self.verify_ssl else False
ssl_context = None
if not self.verify_ssl:
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.session = aiohttp.ClientSession( self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={"Accept": "application/jrd+json, application/json"}, headers={"Accept": "application/jrd+json, application/json"},
connector=aiohttp.TCPConnector(ssl=ssl_context) connector=aiohttp.TCPConnector(ssl=ssl)
) )
async def finger(self, account: str) -> Optional[Dict[str, Any]]: async def finger(self, account: str) -> Optional[Dict[str, Any]]:
...@@ -41,34 +29,27 @@ class WebFingerClient: ...@@ -41,34 +29,27 @@ class WebFingerClient:
Perform WebFinger lookup. Perform WebFinger lookup.
Args: Args:
account: Account to look up (e.g. user@domain.com) account: Account to look up (e.g., user@domain.com)
Returns: Returns:
WebFinger response if found WebFinger response if found
""" """
try: try:
# Parse account if not '@' in account:
if account.startswith('acct:'):
account = account[5:]
if '@' not in account:
raise WebFingerError(f"Invalid account format: {account}")
username, domain = account.split('@')
resource = f"acct:{username}@{domain}"
# Construct WebFinger URL
url = f"https://{domain}/.well-known/webfinger?resource={quote(resource)}"
# Perform lookup
async with self.session.get(url) as response:
if response.status != 200:
logger.error(
f"WebFinger lookup failed for {account}: {response.status}"
)
return None return None
data = await response.json() # Ensure acct: prefix
return data if not account.startswith('acct:'):
account = f"acct:{account}"
domain = account.split('@')[-1]
url = f"https://{domain}/.well-known/webfinger?resource={quote(account)}"
response = await self.session.get(url)
async with response as resp:
if resp.status != 200:
return None
return await resp.json()
except Exception as e: except Exception as e:
logger.error(f"WebFinger lookup failed for {account}: {e}") logger.error(f"WebFinger lookup failed for {account}: {e}")
...@@ -89,14 +70,10 @@ class WebFingerClient: ...@@ -89,14 +70,10 @@ class WebFingerClient:
if not data: if not data:
return None return None
# Look for actor URL in links
for link in data.get('links', []): for link in data.get('links', []):
if ( if (link.get('rel') == 'self' and
link.get('rel') == 'self' and link.get('type') == 'application/activity+json'):
link.get('type') == 'application/activity+json'
):
return link.get('href') return link.get('href')
return None return None
except Exception as e: except Exception as e:
...@@ -114,18 +91,17 @@ class WebFingerClient: ...@@ -114,18 +91,17 @@ class WebFingerClient:
Inbox URL if found Inbox URL if found
""" """
try: try:
# First get actor URL
actor_url = await self.get_actor_url(account) actor_url = await self.get_actor_url(account)
logger.debug(f"actor_url: {actor_url}")
if not actor_url: if not actor_url:
return None return None
# Fetch actor object response = await self.session.get(actor_url)
async with self.session.get(actor_url) as response: async with response as resp:
if response.status != 200: if resp.status != 200:
return None return None
data = await resp.json()
actor = await response.json() return data.get('inbox')
return actor.get('inbox')
except Exception as e: except Exception as e:
logger.error(f"Failed to get inbox URL for {account}: {e}") logger.error(f"Failed to get inbox URL for {account}: {e}")
......
"""
Security component interfaces.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class SignatureVerifier(ABC):
"""Interface for HTTP signature verification."""
@abstractmethod
async def verify_request(self, headers: Dict[str, str]) -> bool:
"""
Verify request signature.
Args:
headers: Request headers including signature
Returns:
True if signature is valid
"""
pass
@abstractmethod
async def sign_request(self,
method: str,
path: str,
host: str,
body: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""
Sign a request.
Args:
method: HTTP method
path: Request path
host: Target host
body: Optional request body
Returns:
Headers with signature
Raises:
SignatureError: If signing fails
"""
pass
\ No newline at end of file
"""
Enhanced rate limiting with better configurability and monitoring.
"""
from typing import Dict, Optional, NamedTuple, List
import time
import asyncio
from datetime import datetime
from dataclasses import dataclass
import redis.asyncio as redis
from ..utils.exceptions import RateLimitExceeded
from ..utils.logging import get_logger
logger = get_logger(__name__)
@dataclass
class RateLimitConfig:
"""Rate limit configuration."""
requests_per_minute: int
window_seconds: int = 60
burst_size: int = 0 # Allow burst over limit
redis_url: Optional[str] = None
class RateLimitInfo(NamedTuple):
"""Rate limit information."""
remaining: int
reset_time: int
limit: int
class RateLimiter:
"""Enhanced rate limiting implementation."""
def __init__(self, config: RateLimitConfig):
self.config = config
self.redis_client = None
self._local_counters: Dict[str, Dict] = {}
self._lock = asyncio.Lock()
self.metrics = {
'total_requests': 0,
'exceeded_limits': 0,
'redis_errors': 0
}
async def init(self) -> None:
"""Initialize rate limiter."""
if self.config.redis_url:
await self._init_redis()
async def _init_redis(self) -> None:
"""Initialize Redis connection with retries."""
retries = 3
for attempt in range(retries):
try:
self.redis_client = redis.from_url(self.config.redis_url)
await self.redis_client.ping()
break
except Exception as e:
if attempt == retries - 1:
logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
else:
await asyncio.sleep(1)
async def check_rate_limit(self, client_id: str) -> RateLimitInfo:
"""Enhanced rate limit check with detailed information."""
self.metrics['total_requests'] += 1
try:
current_time = int(time.time())
if self.redis_client:
count = await self._check_redis_rate_limit(client_id, current_time)
else:
async with self._lock:
count = await self._check_local_rate_limit(client_id, current_time)
remaining = max(0, self.config.requests_per_minute - count)
reset_time = (current_time // self.config.window_seconds + 1) * self.config.window_seconds
if count > self.config.requests_per_minute + self.config.burst_size:
self.metrics['exceeded_limits'] += 1
raise RateLimitExceeded(
message=f"Rate limit exceeded. Try again after {reset_time}",
reset_time=reset_time,
limit=self.config.requests_per_minute,
remaining=remaining
)
return RateLimitInfo(
remaining=remaining,
reset_time=reset_time,
limit=self.config.requests_per_minute
)
except RateLimitExceeded:
raise
except redis.RedisError as e:
self.metrics['redis_errors'] += 1
logger.error(f"Redis error in rate limit check: {e}")
# Fall back to local rate limiting
return await self._check_local_rate_limit(client_id, current_time)
async def _check_redis_rate_limit(self, client_id: str, current_time: int) -> int:
"""Enhanced Redis rate limiting with sliding window."""
key = f"rate_limit:{client_id}"
window_key = f"{key}:{current_time // self.config.window_seconds}"
async with self.redis_client.pipeline() as pipe:
try:
# Use pipeline for atomic operations
pipe.watch(window_key)
current_count = await pipe.get(window_key) or 0
pipe.multi()
pipe.incr(window_key)
pipe.expire(window_key, self.config.window_seconds)
# Handle sliding window
prev_window_key = f"{key}:{(current_time // self.config.window_seconds) - 1}"
prev_count = await pipe.get(prev_window_key) or 0
results = await pipe.execute()
current_count = int(results[0])
# Calculate weighted count for sliding window
weight = ((current_time % self.config.window_seconds) / self.config.window_seconds)
total_count = int(current_count + (int(prev_count) * (1 - weight)))
return total_count
except redis.WatchError:
# Key modified, retry
return await self._check_redis_rate_limit(client_id, current_time)
async def get_metrics(self) -> Dict[str, int]:
"""Get rate limiter metrics."""
return {
**self.metrics,
'active_clients': len(self._local_counters)
}
async def reset_limits(self, client_id: Optional[str] = None) -> None:
"""Reset rate limits for client or all clients."""
if client_id:
if self.redis_client:
pattern = f"rate_limit:{client_id}:*"
keys = await self.redis_client.keys(pattern)
if keys:
await self.redis_client.delete(*keys)
if client_id in self._local_counters:
del self._local_counters[client_id]
else:
if self.redis_client:
pattern = "rate_limit:*"
keys = await self.redis_client.keys(pattern)
if keys:
await self.redis_client.delete(*keys)
self._local_counters.clear()
\ No newline at end of file
\ No newline at end of file
"""
Enhanced WebFinger implementation for ActivityPub.
"""
import aiohttp
import json
from typing import Dict, Any, Optional, List, Union
from dataclasses import dataclass
from urllib.parse import urlparse, urljoin
import cachetools
import asyncio
from datetime import timedelta
@dataclass
class WebFingerConfig:
"""Configuration for WebFinger service."""
cache_ttl: int = 3600 # Cache TTL in seconds
timeout: int = 10 # Request timeout in seconds
max_redirects: int = 3
cache_size: int = 1000
user_agent: str = "PyFed/1.0 (WebFinger)"
allowed_protocols: List[str] = ("https",)
class WebFingerError(Exception):
"""Base exception for WebFinger errors."""
pass
class WebFingerService:
"""Enhanced WebFinger service."""
def __init__(self,
local_domain: str,
config: Optional[WebFingerConfig] = None):
"""
Initialize WebFinger service.
Args:
local_domain: Local server domain
config: Optional configuration
"""
self.local_domain = local_domain
self.config = config or WebFingerConfig()
# Initialize cache
self.cache = cachetools.TTLCache(
maxsize=self.config.cache_size,
ttl=self.config.cache_ttl
)
# Lock for thread safety
self._lock = asyncio.Lock()
# Metrics
self.metrics = {
'webfinger_requests': 0,
'cache_hits': 0,
'cache_misses': 0,
'failed_requests': 0
}
async def get_resource(self, resource: str) -> Dict[str, Any]:
"""
Get WebFinger resource data.
Args:
resource: Resource identifier (acct:user@domain or https://domain/users/user)
Returns:
Dict containing resource data
Raises:
WebFingerError: If resource lookup fails
"""
try:
self.metrics['webfinger_requests'] += 1
# Check cache
async with self._lock:
if resource in self.cache:
self.metrics['cache_hits'] += 1
return self.cache[resource]
self.metrics['cache_misses'] += 1
# Parse resource
domain = self._get_domain(resource)
if not domain:
raise WebFingerError(f"Invalid resource: {resource}")
# Make WebFinger request
data = await self._fetch_webfinger(domain, resource)
# Cache result
async with self._lock:
self.cache[resource] = data
return data
except WebFingerError:
raise
except Exception as e:
self.metrics['failed_requests'] += 1
raise WebFingerError(f"WebFinger lookup failed: {e}")
async def get_actor_url(self, account: str) -> str:
"""
Get ActivityPub actor URL for account.
Args:
account: Account in format user@domain
Returns:
Actor URL
Raises:
WebFingerError: If actor lookup fails
"""
try:
# Validate account format
if '@' not in account:
raise WebFingerError("Invalid account format")
username, domain = account.split('@')
# First try WebFinger
resource = f"acct:{account}"
try:
data = await self.get_resource(resource)
# Look for ActivityPub profile URL
for link in data.get('links', []):
if (link.get('rel') == 'self' and
link.get('type') == 'application/activity+json'):
return link['href']
except WebFingerError:
# Fall back to direct URL only for allowed protocols
if domain:
url = f"https://{domain}/users/{username}"
if urlparse(url).scheme in self.config.allowed_protocols:
return url
raise WebFingerError(f"Could not determine actor URL for {account}")
except WebFingerError:
raise
except Exception as e:
raise WebFingerError(f"Actor URL lookup failed: {e}")
async def get_actor_data(self, url: str) -> Dict[str, Any]:
"""
Fetch actor data from URL.
Args:
url: Actor URL
Returns:
Actor data
Raises:
WebFingerError: If actor fetch fails
"""
try:
headers = {
"Accept": "application/activity+json",
"User-Agent": self.config.user_agent
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise WebFingerError(
f"Failed to fetch actor: {response.status}"
)
data = await response.json()
# Validate data
if not isinstance(data, dict):
raise WebFingerError("Invalid actor data format")
required_fields = {'id', 'type'}
if not all(field in data for field in required_fields):
raise WebFingerError("Missing required actor fields")
return data
except aiohttp.ClientError as e:
raise WebFingerError(f"Network error: {e}")
except json.JSONDecodeError as e:
raise WebFingerError(f"Invalid JSON response: {e}")
except Exception as e:
raise WebFingerError(f"Failed to fetch actor data: {e}")
async def _fetch_webfinger(self, domain: str, resource: str) -> Dict[str, Any]:
"""Make WebFinger request with security checks."""
try:
# Validate domain and build URL
if not self._is_valid_domain(domain):
raise WebFingerError(f"Invalid domain: {domain}")
webfinger_url = f"https://{domain}/.well-known/webfinger"
params = {"resource": resource}
headers = {
"Accept": "application/jrd+json",
"User-Agent": self.config.user_agent
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
webfinger_url,
params=params,
headers=headers,
max_redirects=self.config.max_redirects
) as response:
if response.status != 200:
raise WebFingerError(
f"WebFinger request failed: {response.status}"
)
data = await response.json()
# Validate response
if not isinstance(data, dict):
raise WebFingerError("Invalid WebFinger response format")
if 'subject' not in data:
raise WebFingerError("Missing required field: subject")
return data
except Exception as e:
raise WebFingerError(f"WebFinger request failed: {e}")
def _get_domain(self, resource: str) -> Optional[str]:
"""Extract domain from resource with validation."""
try:
if resource.startswith('acct:'):
_, address = resource.split(':', 1)
if '@' in address:
return address.split('@')[1]
else:
parsed = urlparse(resource)
if parsed.netloc:
return parsed.netloc
return None
except Exception:
return None
def _is_valid_domain(self, domain: str) -> bool:
"""Validate domain name."""
if not domain:
return False
# Add your domain validation logic here
return True
async def cleanup(self):
"""Cleanup resources."""
self.cache.clear()
\ No newline at end of file
[pytest] [pytest]
asyncio_mode = auto
filterwarnings = filterwarnings =
ignore::DeprecationWarning ignore::DeprecationWarning
pythonpath = ../src pythonpath = ../src
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment