tasks.py 20.8 KB
Newer Older
1
import datetime
2
import json
3
import logging
4
import os
Eliot Berriot's avatar
Eliot Berriot committed
5
import requests
6
7

from django.conf import settings
8
from django.db import transaction
9
from django.db.models import Q, F
Eliot Berriot's avatar
Eliot Berriot committed
10
from django.db.models.deletion import Collector
11
from django.utils import timezone
12
from dynamic_preferences.registries import global_preferences_registry
Eliot Berriot's avatar
Eliot Berriot committed
13
from requests.exceptions import RequestException
Eliot Berriot's avatar
Eliot Berriot committed
14

Eliot Berriot's avatar
Eliot Berriot committed
15
from funkwhale_api.audio import models as audio_models
16
from funkwhale_api.common import preferences
Eliot Berriot's avatar
Eliot Berriot committed
17
from funkwhale_api.common import models as common_models
18
from funkwhale_api.common import session
Eliot Berriot's avatar
Eliot Berriot committed
19
from funkwhale_api.common import utils as common_utils
20
from funkwhale_api.moderation import mrf
21
from funkwhale_api.music import models as music_models
22
23
from funkwhale_api.taskapp import celery

24
from . import activity
25
from . import actors
Agate's avatar
Agate committed
26
from . import exceptions
27
from . import jsonld
28
from . import keys
Eliot Berriot's avatar
Eliot Berriot committed
29
from . import models, signing
Eliot Berriot's avatar
Eliot Berriot committed
30
from . import serializers
31
from . import routes
Eliot Berriot's avatar
Eliot Berriot committed
32
from . import utils
33
from . import webfinger
34
35
36
37

logger = logging.getLogger(__name__)


Eliot Berriot's avatar
Eliot Berriot committed
38
@celery.app.task(name="federation.clean_music_cache")
39
40
def clean_music_cache():
    preferences = global_preferences_registry.manager()
Eliot Berriot's avatar
Eliot Berriot committed
41
    delay = preferences["federation__music_cache_duration"]
42
43
    if delay < 1:
        return  # cache clearing disabled
44
    limit = timezone.now() - datetime.timedelta(minutes=delay)
45

Eliot Berriot's avatar
Eliot Berriot committed
46
    candidates = (
Eliot Berriot's avatar
Eliot Berriot committed
47
        music_models.Upload.objects.filter(
Eliot Berriot's avatar
Eliot Berriot committed
48
            Q(audio_file__isnull=False)
49
50
            & (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
            # library__actor__user=None,
51
        )
52
        .local(False)
Eliot Berriot's avatar
Eliot Berriot committed
53
        .exclude(audio_file="")
54
        .filter(Q(source__startswith="http://") | Q(source__startswith="https://"))
Eliot Berriot's avatar
Eliot Berriot committed
55
        .only("audio_file", "id")
56
        .order_by("id")
Eliot Berriot's avatar
Eliot Berriot committed
57
    )
Eliot Berriot's avatar
Eliot Berriot committed
58
59
    for upload in candidates:
        upload.audio_file.delete()
60
61

    # we also delete orphaned files, if any
Eliot Berriot's avatar
Eliot Berriot committed
62
    storage = models.LibraryTrack._meta.get_field("audio_file").storage
63
    files = get_files(storage, "federation_cache/tracks")
Eliot Berriot's avatar
Eliot Berriot committed
64
    existing = music_models.Upload.objects.filter(audio_file__in=files)
Eliot Berriot's avatar
Eliot Berriot committed
65
    missing = set(files) - set(existing.values_list("audio_file", flat=True))
66
67
68
69
70
71
72
73
74
75
    for m in missing:
        storage.delete(m)


def get_files(storage, *parts):
    """
    This is a recursive function that return all files available
    in a given directory using django's storage.
    """
    if not parts:
Eliot Berriot's avatar
Eliot Berriot committed
76
        raise ValueError("Missing path")
77
78
79
80
    try:
        dirs, files = storage.listdir(os.path.join(*parts))
    except FileNotFoundError:
        return []
81
82
    for dir in dirs:
        files += get_files(storage, *(list(parts) + [dir]))
Eliot Berriot's avatar
Eliot Berriot committed
83
    return [os.path.join(parts[-1], path) for path in files]
84
85
86
87


@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
88
def dispatch_inbox(activity, call_handlers=True):
89
90
91
92
93
    """
    Given an activity instance, triggers our internal delivery logic (follow
    creation, etc.)
    """

Eliot Berriot's avatar
Eliot Berriot committed
94
95
96
97
98
    routes.inbox.dispatch(
        activity.payload,
        context={
            "activity": activity,
            "actor": activity.actor,
Eliot Berriot's avatar
Eliot Berriot committed
99
            "inbox_items": activity.inbox_items.filter(is_read=False).order_by("id"),
Eliot Berriot's avatar
Eliot Berriot committed
100
        },
101
        call_handlers=call_handlers,
Eliot Berriot's avatar
Eliot Berriot committed
102
    )
103
104
105
106
107
108


@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
    """
Eliot Berriot's avatar
Eliot Berriot committed
109
    Deliver a local activity to its recipients, both locally and remotely
110
    """
Eliot Berriot's avatar
Eliot Berriot committed
111
    inbox_items = activity.inbox_items.filter(is_read=False).select_related()
112

Eliot Berriot's avatar
Eliot Berriot committed
113
    if inbox_items.exists():
114
115
        call_handlers = activity.type in ["Follow"]
        dispatch_inbox.delay(activity_id=activity.pk, call_handlers=call_handlers)
116

117
118
119
120
121
122
    if not preferences.get("federation__enabled"):
        # federation is disabled, we only deliver to local recipients
        return

    deliveries = activity.deliveries.filter(is_delivered=False)

Eliot Berriot's avatar
Eliot Berriot committed
123
124
    for id in deliveries.values_list("pk", flat=True):
        deliver_to_remote.delay(delivery_id=id)
125
126
127
128
129
130
131
132


@celery.app.task(
    name="federation.deliver_to_remote_inbox",
    autoretry_for=[RequestException],
    retry_backoff=30,
    max_retries=5,
)
Eliot Berriot's avatar
Eliot Berriot committed
133
134
135
136
137
138
139
@celery.require_instance(
    models.Delivery.objects.filter(is_delivered=False).select_related(
        "activity__actor"
    ),
    "delivery",
)
def deliver_to_remote(delivery):
140
141
142
143
144

    if not preferences.get("federation__enabled"):
        # federation is disabled, we only deliver to local recipients
        return

Eliot Berriot's avatar
Eliot Berriot committed
145
146
    actor = delivery.activity.actor
    logger.info("Preparing activity delivery to %s", delivery.inbox_url)
147
148
149
150
    auth = signing.get_auth(actor.private_key, actor.private_key_id)
    try:
        response = session.get_session().post(
            auth=auth,
Eliot Berriot's avatar
Eliot Berriot committed
151
152
            json=delivery.activity.payload,
            url=delivery.inbox_url,
153
154
155
156
157
            headers={"Content-Type": "application/activity+json"},
        )
        logger.debug("Remote answered with %s", response.status_code)
        response.raise_for_status()
    except Exception:
Eliot Berriot's avatar
Eliot Berriot committed
158
159
160
        delivery.last_attempt_date = timezone.now()
        delivery.attempts = F("attempts") + 1
        delivery.save(update_fields=["last_attempt_date", "attempts"])
161
162
        raise
    else:
Eliot Berriot's avatar
Eliot Berriot committed
163
164
165
166
        delivery.last_attempt_date = timezone.now()
        delivery.attempts = F("attempts") + 1
        delivery.is_delivered = True
        delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
Eliot Berriot's avatar
Eliot Berriot committed
167
168
169
170
171


def fetch_nodeinfo(domain_name):
    s = session.get_session()
    wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
Eliot Berriot's avatar
Eliot Berriot committed
172
    response = s.get(url=wellknown_url)
Eliot Berriot's avatar
Eliot Berriot committed
173
174
175
176
177
178
179
180
181
    response.raise_for_status()
    serializer = serializers.NodeInfoSerializer(data=response.json())
    serializer.is_valid(raise_exception=True)
    nodeinfo_url = None
    for link in serializer.validated_data["links"]:
        if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
            nodeinfo_url = link["href"]
            break

Eliot Berriot's avatar
Eliot Berriot committed
182
    response = s.get(url=nodeinfo_url)
Eliot Berriot's avatar
Eliot Berriot committed
183
184
185
186
187
188
189
190
191
192
193
194
    response.raise_for_status()
    return response.json()


@celery.app.task(name="federation.update_domain_nodeinfo")
@celery.require_instance(
    models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
)
def update_domain_nodeinfo(domain):
    now = timezone.now()
    try:
        nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
Eliot Berriot's avatar
Linting    
Eliot Berriot committed
195
196
197
198
199
    except (
        requests.RequestException,
        serializers.serializers.ValidationError,
        ValueError,
    ) as e:
Eliot Berriot's avatar
Eliot Berriot committed
200
        nodeinfo = {"status": "error", "error": str(e)}
Eliot Berriot's avatar
Eliot Berriot committed
201
202
203
204
205
206
207
208

    service_actor_id = common_utils.recursive_getattr(
        nodeinfo, "payload.metadata.actorId", permissive=True
    )
    try:
        domain.service_actor = (
            utils.retrieve_ap_object(
                service_actor_id,
209
                actor=actors.get_service_actor(),
Eliot Berriot's avatar
Eliot Berriot committed
210
211
212
213
214
215
                queryset=models.Actor,
                serializer_class=serializers.ActorSerializer,
            )
            if service_actor_id
            else None
        )
Agate's avatar
Agate committed
216
217
218
219
220
    except (
        serializers.serializers.ValidationError,
        RequestException,
        exceptions.BlockedActorOrDomain,
    ) as e:
Eliot Berriot's avatar
Eliot Berriot committed
221
222
223
        logger.warning(
            "Cannot fetch system actor for domain %s: %s", domain.name, str(e)
        )
Eliot Berriot's avatar
Eliot Berriot committed
224
225
    domain.nodeinfo_fetch_date = now
    domain.nodeinfo = nodeinfo
Eliot Berriot's avatar
Eliot Berriot committed
226
    domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
227
228


229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@celery.app.task(name="federation.refresh_nodeinfo_known_nodes")
def refresh_nodeinfo_known_nodes():
    """
    Trigger a node info refresh on all nodes that weren't refreshed since
    settings.NODEINFO_REFRESH_DELAY
    """
    limit = timezone.now() - datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY)
    candidates = models.Domain.objects.external().exclude(
        nodeinfo_fetch_date__gte=limit
    )
    names = candidates.values_list("name", flat=True)
    logger.info("Launching periodic nodeinfo refresh on %s domains", len(names))
    for domain_name in names:
        update_domain_nodeinfo.delay(domain_name=domain_name)


245
246
247
248
249
250
251
252
253
254
def delete_qs(qs):
    label = qs.model._meta.label
    result = qs.delete()
    related = sum(result[1].values())

    logger.info(
        "Purged %s %s objects (and %s related entities)", result[0], label, related
    )


255
256
257
258
259
def handle_purge_actors(ids, only=[]):
    """
    Empty only means we purge everything
    Otherwise, we purge only the requested bits: media
    """
260
    # purge follows (received emitted)
261
262
263
    if not only:
        delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
        delete_qs(models.Follow.objects.filter(actor_id__in=ids))
264
265

    # purge audio content
266
    if not only or "media" in only:
Eliot Berriot's avatar
Eliot Berriot committed
267
        delete_qs(common_models.Attachment.objects.filter(actor__in=ids))
268
269
        delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
        delete_qs(models.Follow.objects.filter(target_id__in=ids))
Eliot Berriot's avatar
Eliot Berriot committed
270
271
        delete_qs(audio_models.Channel.objects.filter(attributed_to__in=ids))
        delete_qs(audio_models.Channel.objects.filter(actor__in=ids))
272
273
        delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
        delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
274
275

    # purge remaining activities / deliveries
276
277
278
    if not only:
        delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
        delete_qs(models.Activity.objects.filter(actor_id__in=ids))
279
280
281


@celery.app.task(name="federation.purge_actors")
282
def purge_actors(ids=[], domains=[], only=[]):
283
284
285
286
287
    actors = models.Actor.objects.filter(
        Q(id__in=ids) | Q(domain_id__in=domains)
    ).order_by("id")
    found_ids = list(actors.values_list("id", flat=True))
    logger.info("Starting purging %s accounts", len(found_ids))
288
    handle_purge_actors(ids=found_ids, only=only)
289
290
291
292
293
294
295
296
297


@celery.app.task(name="federation.rotate_actor_key")
@celery.require_instance(models.Actor.objects.local(), "actor")
def rotate_actor_key(actor):
    pair = keys.get_key_pair()
    actor.private_key = pair[0].decode()
    actor.public_key = pair[1].decode()
    actor.save(update_fields=["private_key", "public_key"])
298
299
300
301
302


@celery.app.task(name="federation.fetch")
@transaction.atomic
@celery.require_instance(
303
304
305
    models.Fetch.objects.filter(status="pending").select_related("actor"),
    "fetch_obj",
    "fetch_id",
306
)
307
def fetch(fetch_obj):
308
    def error(code, **kwargs):
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
        fetch_obj.status = "errored"
        fetch_obj.fetch_date = timezone.now()
        fetch_obj.detail = {"error_code": code}
        fetch_obj.detail.update(kwargs)
        fetch_obj.save(update_fields=["fetch_date", "status", "detail"])

    url = fetch_obj.url
    mrf_check_url = url
    if not mrf_check_url.startswith("webfinger://"):
        payload, updated = mrf.inbox.apply({"id": mrf_check_url})
        if not payload:
            return error("blocked", message="Blocked by MRF")

    actor = fetch_obj.actor
    if settings.FEDERATION_AUTHENTIFY_FETCHES:
        auth = signing.get_auth(actor.private_key, actor.private_key_id)
    else:
        auth = None
327
    try:
328
329
330
331
332
333
334
335
336
337
338
339
        if url.startswith("webfinger://"):
            # we first grab the correpsonding webfinger representation
            # to get the ActivityPub actor ID
            webfinger_data = webfinger.get_resource(
                "acct:" + url.replace("webfinger://", "")
            )
            url = webfinger.get_ap_url(webfinger_data["links"])
            if not url:
                return error("webfinger", message="Invalid or missing webfinger data")
            payload, updated = mrf.inbox.apply({"id": url})
            if not payload:
                return error("blocked", message="Blocked by MRF")
340
        response = session.get_session().get(
341
            auth=auth, url=url, headers={"Accept": "application/activity+json"},
342
        )
Agate's avatar
Agate committed
343
        logger.debug("Remote answered with %s: %s", response.status_code, response.text)
344
345
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
Agate's avatar
Agate committed
346
347
348
349
350
        return error(
            "http",
            status_code=e.response.status_code if e.response else None,
            message=response.text,
        )
351
352
353
354
355
356
357
358
359
360
361
362
    except requests.exceptions.Timeout:
        return error("timeout")
    except requests.exceptions.ConnectionError as e:
        return error("connection", message=str(e))
    except requests.RequestException as e:
        return error("request", message=str(e))
    except Exception as e:
        return error("unhandled", message=str(e))

    try:
        payload = response.json()
    except json.decoder.JSONDecodeError:
363
364
365
366
367
368
369
        # we attempt to extract a <link rel=alternate> that points
        # to an activity pub resource, if possible, and retry with this URL
        alternate_url = utils.find_alternate(response.text)
        if alternate_url:
            fetch_obj.url = alternate_url
            fetch_obj.save(update_fields=["url"])
            return fetch(fetch_id=fetch_obj.pk)
370
371
        return error("invalid_json")

372
373
374
375
    payload, updated = mrf.inbox.apply(payload)
    if not payload:
        return error("blocked", message="Blocked by MRF")

376
377
378
379
380
381
382
383
384
385
    try:
        doc = jsonld.expand(payload)
    except ValueError:
        return error("invalid_jsonld")

    try:
        type = doc.get("@type", [])[0]
    except IndexError:
        return error("missing_jsonld_type")
    try:
386
387
        serializer_classes = fetch_obj.serializers[type]
        model = serializer_classes[0].Meta.model
388
    except (KeyError, AttributeError):
389
390
391
392
        fetch_obj.status = "skipped"
        fetch_obj.fetch_date = timezone.now()
        fetch_obj.detail = {"reason": "unhandled_type", "type": type}
        return fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
393
394
395
396
397
398
399
    try:
        id = doc.get("@id")
    except IndexError:
        existing = None
    else:
        existing = model.objects.filter(fid=id).first()

400
401
    serializer = None
    for serializer_class in serializer_classes:
402
403
404
        serializer = serializer_class(
            existing, data=payload, context={"fetch_actor": actor}
        )
405
406
407
408
409
        if not serializer.is_valid():
            continue
        else:
            break
    if serializer.errors:
410
411
        return error("validation", validation_errors=serializer.errors)
    try:
412
        obj = serializer.save()
413
414
415
416
    except Exception as e:
        error("save", message=str(e))
        raise

Eliot Berriot's avatar
Eliot Berriot committed
417
418
419
    # special case for channels
    # when obj is an actor, we check if the actor has a channel associated with it
    # if it is the case, we consider the fetch obj to be a channel instead
420
    # and also trigger a fetch on the channel outbox
Eliot Berriot's avatar
Eliot Berriot committed
421
422
    if isinstance(obj, models.Actor) and obj.get_channel():
        obj = obj.get_channel()
423
        if obj.actor.outbox_url:
Eliot Berriot's avatar
Eliot Berriot committed
424
425
426
427
428
429
430
431
            try:
                # first page fetch is synchronous, so that at least some data is available
                # in the UI after subscription
                result = fetch_collection(
                    obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
                )
            except Exception:
                logger.exception(
432
                    "Error while fetching actor outbox: %s", obj.actor.outbox_url
433
                )
Eliot Berriot's avatar
Eliot Berriot committed
434
435
436
437
438
439
440
441
442
            else:
                if result.get("next_page"):
                    # additional pages are fetched in the background
                    result = fetch_collection.delay(
                        result["next_page"],
                        channel_id=obj.pk,
                        max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
                        is_page=True,
                    )
443

444
445
446
447
448
449
    fetch_obj.object = obj
    fetch_obj.status = "finished"
    fetch_obj.fetch_date = timezone.now()
    return fetch_obj.save(
        update_fields=["fetch_date", "status", "object_id", "object_content_type"]
    )
Eliot Berriot's avatar
Eliot Berriot committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511


class PreserveSomeDataCollector(Collector):
    """
    We need to delete everything related to an actor. Well… Almost everything.
    But definitely not the Delete Activity we send to announce the actor is deleted.
    """

    def __init__(self, *args, **kwargs):
        self.creation_date = timezone.now()
        super().__init__(*args, **kwargs)

    def related_objects(self, related, *args, **kwargs):
        qs = super().related_objects(related, *args, **kwargs)
        if related.name == "outbox_activities":
            # exclude the delete activity can be broadcasted properly
            qs = qs.exclude(type="Delete", creation_date__gte=self.creation_date)

        return qs


@celery.app.task(name="federation.remove_actor")
@transaction.atomic
@celery.require_instance(
    models.Actor.objects.all(), "actor",
)
def remove_actor(actor):
    # Then we broadcast the info over federation. We do this *before* deleting objects
    # associated with the actor, otherwise follows are removed and we don't know where
    # to broadcast
    logger.info("Broadcasting deletion to federation…")
    collector = PreserveSomeDataCollector(using="default")
    routes.outbox.dispatch(
        {"type": "Delete", "object": {"type": actor.type}}, context={"actor": actor}
    )

    # then we delete any object associated with the actor object, but *not* the actor
    # itself. We keep it for auditability and sending the Delete ActivityPub message
    logger.info(
        "Prepare deletion of objects associated with account %s…",
        actor.preferred_username,
    )
    collector.collect([actor])
    for model, instances in collector.data.items():
        if issubclass(model, actor.__class__):
            # we skip deletion of the actor itself
            continue

        to_delete = model.objects.filter(pk__in=[instance.pk for instance in instances])
        logger.info(
            "Deleting %s objects associated with account %s…",
            len(instances),
            actor.preferred_username,
        )
        to_delete.delete()

    # Finally, we update the actor itself and mark it as removed
    logger.info("Marking actor as Tombsone…")
    actor.type = "Tombstone"
    actor.name = None
    actor.summary = None
    actor.save(update_fields=["type", "name", "summary"])
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614


COLLECTION_ACTIVITY_SERIALIZERS = [
    (
        {"type": "Create", "object.type": "Audio"},
        serializers.ChannelCreateUploadSerializer,
    )
]


def match_serializer(payload, conf):
    return [
        serializer_class
        for route, serializer_class in conf
        if activity.match_route(route, payload)
    ]


@celery.app.task(name="federation.fetch_collection")
@celery.require_instance(
    audio_models.Channel.objects.all(), "channel", allow_null=True,
)
def fetch_collection(url, max_pages, channel, is_page=False):
    actor = actors.get_service_actor()
    results = {
        "items": [],
        "skipped": 0,
        "errored": 0,
        "seen": 0,
        "total": 0,
    }
    if is_page:
        # starting immediatly from a page, no need to fetch the wrapping collection
        logger.debug("Fetch collection page immediatly at %s", url)
        results["next_page"] = url
    else:
        logger.debug("Fetching collection object at %s", url)
        collection = utils.retrieve_ap_object(
            url,
            actor=actor,
            serializer_class=serializers.PaginatedCollectionSerializer,
        )
        results["next_page"] = collection["first"]
        results["total"] = collection.get("totalItems")

    seen_pages = 0
    context = {}
    if channel:
        context["channel"] = channel

    for i in range(max_pages):
        page_url = results["next_page"]
        logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url)
        page = utils.retrieve_ap_object(page_url, actor=actor, serializer_class=None,)
        try:
            items = page["orderedItems"]
        except KeyError:
            try:
                items = page["items"]
            except KeyError:
                logger.error("Invalid collection page at %s", page_url)
                break

        for item in items:
            results["seen"] += 1

            matching_serializer = match_serializer(
                item, COLLECTION_ACTIVITY_SERIALIZERS
            )
            if not matching_serializer:
                results["skipped"] += 1
                logger.debug("Skipping unhandled activity %s", item.get("type"))
                continue

            s = matching_serializer[0](data=item, context=context)
            if not s.is_valid():
                logger.warn("Skipping invalid activity: %s", s.errors)
                results["errored"] += 1
                continue

            results["items"].append(s.save())

        seen_pages += 1
        results["next_page"] = page.get("next", None) or None
        if not results["next_page"]:
            logger.debug("No more pages to fetch")
            break

    logger.info(
        "Finished fetch of collection pages at %s. Results:\n"
        "  Total in collection: %s\n"
        "  Seen: %s\n"
        "  Handled: %s\n"
        "  Skipped: %s\n"
        "  Errored: %s",
        url,
        results.get("total"),
        results["seen"],
        len(results["items"]),
        results["skipped"],
        results["errored"],
    )
    return results