Skip to content
Snippets Groups Projects
Verified Commit 33190613 authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Fix #737: delivering of local activities causing unintended side effects, such...

Fix #737: delivering of local activities causing unintended side effects, such as rollbacking changes
parent 4ef52ebc
No related branches found
No related tags found
No related merge requests found
...@@ -173,69 +173,76 @@ class Router: ...@@ -173,69 +173,76 @@ class Router:
class InboxRouter(Router): class InboxRouter(Router):
def get_matching_handlers(self, payload):
return [
handler for route, handler in self.routes if match_route(route, payload)
]
@transaction.atomic @transaction.atomic
def dispatch(self, payload, context): def dispatch(self, payload, context, call_handlers=True):
""" """
Receives an Activity payload and some context and trigger our Receives an Activity payload and some context and trigger our
business logic business logic.
call_handlers should be False when are delivering a local activity, because
we want only want to bind activities to their recipients, not reapply the changes.
""" """
from . import api_serializers from . import api_serializers
from . import models from . import models
for route, handler in self.routes: handlers = self.get_matching_handlers(payload)
if match_route(route, payload): for handler in handlers:
if call_handlers:
r = handler(payload, context=context) r = handler(payload, context=context)
activity_obj = context.get("activity") else:
if activity_obj and r: r = None
# handler returned additional data we can use activity_obj = context.get("activity")
# to update the activity target if activity_obj and r:
for key, value in r.items(): # handler returned additional data we can use
setattr(activity_obj, key, value) # to update the activity target
for key, value in r.items():
update_fields = [] setattr(activity_obj, key, value)
for k in r.keys():
if k in ["object", "target", "related_object"]: update_fields = []
update_fields += [ for k in r.keys():
"{}_id".format(k), if k in ["object", "target", "related_object"]:
"{}_content_type".format(k), update_fields += [
] "{}_id".format(k),
else: "{}_content_type".format(k),
update_fields.append(k) ]
activity_obj.save(update_fields=update_fields) else:
update_fields.append(k)
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES: activity_obj.save(update_fields=update_fields)
return
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES:
inbox_items = context.get( return
"inbox_items", models.InboxItem.objects.none()
) inbox_items = context.get("inbox_items", models.InboxItem.objects.none())
inbox_items = ( inbox_items = (
inbox_items.select_related() inbox_items.select_related()
.select_related("actor__user") .select_related("actor__user")
.prefetch_related( .prefetch_related(
"activity__object", "activity__object", "activity__target", "activity__related_object"
"activity__target",
"activity__related_object",
)
) )
)
for ii in inbox_items: for ii in inbox_items:
user = ii.actor.get_user() user = ii.actor.get_user()
if not user: if not user:
continue continue
group = "user.{}.inbox".format(user.pk) group = "user.{}.inbox".format(user.pk)
channels.group_send( channels.group_send(
group, group,
{ {
"type": "event.send", "type": "event.send",
"text": "", "text": "",
"data": { "data": {
"type": "inbox.item_added", "type": "inbox.item_added",
"item": api_serializers.InboxItemSerializer(ii).data, "item": api_serializers.InboxItemSerializer(ii).data,
},
}, },
) },
return )
return
ACTOR_KEY_ROTATION_LOCK_CACHE_KEY = "federation:actor-key-rotation-lock:{}" ACTOR_KEY_ROTATION_LOCK_CACHE_KEY = "federation:actor-key-rotation-lock:{}"
......
...@@ -71,7 +71,7 @@ def get_files(storage, *parts): ...@@ -71,7 +71,7 @@ def get_files(storage, *parts):
@celery.app.task(name="federation.dispatch_inbox") @celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity") @celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_inbox(activity): def dispatch_inbox(activity, call_handlers=True):
""" """
Given an activity instance, triggers our internal delivery logic (follow Given an activity instance, triggers our internal delivery logic (follow
creation, etc.) creation, etc.)
...@@ -84,6 +84,7 @@ def dispatch_inbox(activity): ...@@ -84,6 +84,7 @@ def dispatch_inbox(activity):
"actor": activity.actor, "actor": activity.actor,
"inbox_items": activity.inbox_items.filter(is_read=False), "inbox_items": activity.inbox_items.filter(is_read=False),
}, },
call_handlers=call_handlers,
) )
...@@ -96,7 +97,7 @@ def dispatch_outbox(activity): ...@@ -96,7 +97,7 @@ def dispatch_outbox(activity):
inbox_items = activity.inbox_items.filter(is_read=False).select_related() inbox_items = activity.inbox_items.filter(is_read=False).select_related()
if inbox_items.exists(): if inbox_items.exists():
dispatch_inbox.delay(activity_id=activity.pk) dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
if not preferences.get("federation__enabled"): if not preferences.get("federation__enabled"):
# federation is disabled, we only deliver to local recipients # federation is disabled, we only deliver to local recipients
......
...@@ -190,6 +190,16 @@ def test_inbox_routing(factories, mocker): ...@@ -190,6 +190,16 @@ def test_inbox_routing(factories, mocker):
assert a.target == target assert a.target == target
def test_inbox_routing_no_handler(factories, mocker):
router = activity.InboxRouter()
a = factories["federation.Activity"](type="Follow")
handler = mocker.Mock()
router.connect({"type": "Follow"}, handler)
router.dispatch({"type": "Follow"}, context={"activity": a}, call_handlers=False)
handler.assert_not_called()
def test_inbox_routing_send_to_channel(factories, mocker): def test_inbox_routing_send_to_channel(factories, mocker):
group_send = mocker.patch("funkwhale_api.common.channels.group_send") group_send = mocker.patch("funkwhale_api.common.channels.group_send")
a = factories["federation.Activity"](type="Follow") a = factories["federation.Activity"](type="Follow")
......
...@@ -74,10 +74,12 @@ def test_handle_in(factories, mocker, now, queryset_equal_list): ...@@ -74,10 +74,12 @@ def test_handle_in(factories, mocker, now, queryset_equal_list):
a = factories["federation.Activity"](payload={"hello": "world"}) a = factories["federation.Activity"](payload={"hello": "world"})
ii1 = factories["federation.InboxItem"](activity=a, actor=r1) ii1 = factories["federation.InboxItem"](activity=a, actor=r1)
ii2 = factories["federation.InboxItem"](activity=a, actor=r2) ii2 = factories["federation.InboxItem"](activity=a, actor=r2)
tasks.dispatch_inbox(activity_id=a.pk) tasks.dispatch_inbox(activity_id=a.pk, call_handlers=False)
mocked_dispatch.assert_called_once_with( mocked_dispatch.assert_called_once_with(
a.payload, context={"actor": a.actor, "activity": a, "inbox_items": [ii1, ii2]} a.payload,
context={"actor": a.actor, "activity": a, "inbox_items": [ii1, ii2]},
call_handlers=False,
) )
...@@ -90,7 +92,7 @@ def test_dispatch_outbox(factories, mocker): ...@@ -90,7 +92,7 @@ def test_dispatch_outbox(factories, mocker):
factories["federation.InboxItem"](activity=activity) factories["federation.InboxItem"](activity=activity)
delivery = factories["federation.Delivery"](activity=activity) delivery = factories["federation.Delivery"](activity=activity)
tasks.dispatch_outbox(activity_id=activity.pk) tasks.dispatch_outbox(activity_id=activity.pk)
mocked_inbox.assert_called_once_with(activity_id=activity.pk) mocked_inbox.assert_called_once_with(activity_id=activity.pk, call_handlers=False)
mocked_deliver_to_remote.assert_called_once_with(delivery_id=delivery.pk) mocked_deliver_to_remote.assert_called_once_with(delivery_id=delivery.pk)
...@@ -104,7 +106,7 @@ def test_dispatch_outbox_disabled_federation(factories, mocker, preferences): ...@@ -104,7 +106,7 @@ def test_dispatch_outbox_disabled_federation(factories, mocker, preferences):
factories["federation.InboxItem"](activity=activity) factories["federation.InboxItem"](activity=activity)
factories["federation.Delivery"](activity=activity) factories["federation.Delivery"](activity=activity)
tasks.dispatch_outbox(activity_id=activity.pk) tasks.dispatch_outbox(activity_id=activity.pk)
mocked_inbox.assert_called_once_with(activity_id=activity.pk) mocked_inbox.assert_called_once_with(activity_id=activity.pk, call_handlers=False)
mocked_deliver_to_remote.assert_not_called() mocked_deliver_to_remote.assert_not_called()
......
Fixed delivering of local activities causing unintended side effects, such as rollbacking changes (#737)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment