From 2b09157c2afcbfc683f178700f9a851f730ba5a2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 Aug 2025 14:06:55 +0000 Subject: [PATCH] Add comprehensive handling for blocked instance errors in activity processing Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com> --- .../queue/processors/InboxProcessorService.ts | 3 + .../queue/processors/InboxProcessorService.ts | 129 +++++++++++++++++- 2 files changed, 128 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 4aa0779d6e..248f0791da 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -239,6 +239,9 @@ export class InboxProcessorService implements OnApplicationShutdown { if (e.id === 'd450b8a9-48e4-4dab-ae36-f4db763fda7c') { // invalid Note return e.message; } + if (e.id === '09d79f9e-64f1-4316-9cfa-e75c4d091574') { // Instance is blocked + return 'skip: blocked instance'; + } } throw e; } diff --git a/packages/backend/test/unit/queue/processors/InboxProcessorService.ts b/packages/backend/test/unit/queue/processors/InboxProcessorService.ts index be833fb17a..fc4fea520c 100644 --- a/packages/backend/test/unit/queue/processors/InboxProcessorService.ts +++ b/packages/backend/test/unit/queue/processors/InboxProcessorService.ts @@ -13,6 +13,7 @@ import * as Bull from 'bullmq'; import { InboxProcessorService } from '@/queue/processors/InboxProcessorService.js'; import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { StatusError } from '@/misc/status-error.js'; import { GlobalModule } from '@/GlobalModule.js'; import { CoreModule } from '@/core/CoreModule.js'; import { MiMeta } from '@/models/_.js'; @@ -42,7 +43,7 @@ describe('InboxProcessorService', () => { describe('process', () => { test('should skip jobs when actor is from blocked instance via relay', async () => { - // Mock getAuthUserFromKeyId to return null (not found) + // Mock getAuthUserFromKeyId to return null (simulating relay scenario where keyId host differs) jest.spyOn(apDbResolverService, 'getAuthUserFromKeyId').mockResolvedValue(null); // Mock getAuthUserFromApId to throw "Instance is blocked" error @@ -52,11 +53,11 @@ describe('InboxProcessorService', () => { const jobData = { signature: { - keyId: 'https://relay.example.com/actor#main-key', + keyId: 'https://relay.example.com/actor#main-key', // Different from actor host }, activity: { type: 'Create', - actor: 'https://blocked.example.com/users/testuser', + actor: 'https://blocked.example.com/users/testuser', // Blocked instance id: 'https://blocked.example.com/activities/1', object: { type: 'Note', @@ -71,7 +72,7 @@ describe('InboxProcessorService', () => { data: jobData, } as Bull.Job; - // Should throw UnrecoverableError with skip message + // Should throw UnrecoverableError with skip message instead of retrying await assert.rejects( inboxProcessorService.process(job), (err: any) => { @@ -80,5 +81,125 @@ describe('InboxProcessorService', () => { } ); }); + + test('should skip jobs when blocked instance error occurs during activity processing', async () => { + // Mock successful user resolution + jest.spyOn(apDbResolverService, 'getAuthUserFromKeyId').mockResolvedValue({ + user: { id: 'user1', uri: 'https://relay.example.com/users/relay' } as any, + key: { keyPem: 'fake-key' } as any, + }); + + // Mock apInboxService.performActivity to throw "Instance is blocked" error + // This simulates the error occurring during object resolution in performActivity + const mockPerformActivity = jest.fn().mockRejectedValue( + new IdentifiableError('09d79f9e-64f1-4316-9cfa-e75c4d091574', 'Instance is blocked') + ); + + // We need to mock the entire service since it's private + Object.defineProperty(inboxProcessorService, 'apInboxService', { + value: { performActivity: mockPerformActivity }, + writable: true, + }); + + const jobData = { + signature: { + keyId: 'https://relay.example.com/actor#main-key', + }, + activity: { + type: 'Create', + actor: 'https://relay.example.com/users/relay', + id: 'https://relay.example.com/activities/1', + object: 'https://blocked.example.com/notes/1', // Reference to blocked instance + }, + }; + + const job = { + data: jobData, + } as Bull.Job; + + // Should return skip message instead of throwing + const result = await inboxProcessorService.process(job); + assert.strictEqual(result, 'skip: blocked instance'); + }); + + test('should handle other errors normally (not affected by the fix)', async () => { + // Mock getAuthUserFromKeyId to return null + jest.spyOn(apDbResolverService, 'getAuthUserFromKeyId').mockResolvedValue(null); + + // Mock getAuthUserFromApId to throw a different IdentifiableError + jest.spyOn(apDbResolverService, 'getAuthUserFromApId').mockRejectedValue( + new IdentifiableError('some-other-error-id', 'Some other error') + ); + + const jobData = { + signature: { + keyId: 'https://example.com/actor#main-key', + }, + activity: { + type: 'Create', + actor: 'https://example.com/users/testuser', + id: 'https://example.com/activities/1', + object: { + type: 'Note', + id: 'https://example.com/notes/1', + content: 'test note', + attributedTo: 'https://example.com/users/testuser', + }, + }, + }; + + const job = { + data: jobData, + } as Bull.Job; + + // Should NOT catch this error and let it propagate (preserving existing behavior) + await assert.rejects( + inboxProcessorService.process(job), + (err: any) => { + return err instanceof IdentifiableError && + err.id === 'some-other-error-id'; + } + ); + }); + + test('should still handle StatusError as before', async () => { + // Mock getAuthUserFromKeyId to return null + jest.spyOn(apDbResolverService, 'getAuthUserFromKeyId').mockResolvedValue(null); + + // Mock getAuthUserFromApId to throw a non-retryable StatusError + jest.spyOn(apDbResolverService, 'getAuthUserFromApId').mockRejectedValue( + new StatusError('Not Found', 404, 'User not found') + ); + + const jobData = { + signature: { + keyId: 'https://example.com/actor#main-key', + }, + activity: { + type: 'Create', + actor: 'https://example.com/users/deleted', + id: 'https://example.com/activities/1', + object: { + type: 'Note', + id: 'https://example.com/notes/1', + content: 'test note', + attributedTo: 'https://example.com/users/deleted', + }, + }, + }; + + const job = { + data: jobData, + } as Bull.Job; + + // Should handle StatusError as before (UnrecoverableError for non-retryable) + await assert.rejects( + inboxProcessorService.process(job), + (err: any) => { + return err instanceof Bull.UnrecoverableError && + err.message.includes('skip: Ignored deleted actors'); + } + ); + }); }); }); \ No newline at end of file