diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index d8a422d404..eeae280b09 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -9,7 +9,7 @@ import { dirname, resolve } from 'node:path'; import * as yaml from 'js-yaml'; import type { RedisOptions } from 'ioredis'; -type RedisOptionsSource = Partial & { +export type RedisOptionsSource = Partial & { host: string; port: number; family?: number; @@ -47,6 +47,14 @@ type Source = { redis: RedisOptionsSource; redisForPubsub?: RedisOptionsSource; redisForJobQueue?: RedisOptionsSource; + redisForSystemQueue?: RedisOptionsSource; + redisForEndedPollNotificationQueue?: RedisOptionsSource; + redisForDeliverQueue?: RedisOptionsSource; + redisForInboxQueue?: RedisOptionsSource; + redisForDbQueue?: RedisOptionsSource; + redisForRelationshipQueue?: RedisOptionsSource; + redisForObjectStorageQueue?: RedisOptionsSource; + redisForWebhookDeliverQueue?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource; meilisearch?: { host: string; @@ -164,7 +172,14 @@ export type Config = { videoThumbnailGenerator: string | null; redis: RedisOptions & RedisOptionsSource; redisForPubsub: RedisOptions & RedisOptionsSource; - redisForJobQueue: RedisOptions & RedisOptionsSource; + redisForSystemQueue: RedisOptions & RedisOptionsSource; + redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource; + redisForDeliverQueue: RedisOptions & RedisOptionsSource; + redisForInboxQueue: RedisOptions & RedisOptionsSource; + redisForDbQueue: RedisOptions & RedisOptionsSource; + redisForRelationshipQueue: RedisOptions & RedisOptionsSource; + redisForObjectStorageQueue: RedisOptions & RedisOptionsSource; + redisForWebhookDeliverQueue: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource; perChannelMaxNoteCacheCount: number; perUserNotificationsMaxCount: number; @@ -209,6 +224,7 @@ export function loadConfig(): Config { : null; const internalMediaProxy = `${scheme}://${host}/proxy`; const redis = convertRedisOptions(config.redis, host); + const redisForJobQueue = config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis; return { version, @@ -231,7 +247,14 @@ export function loadConfig(): Config { meilisearch: config.meilisearch, redis, redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis, - redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis, + redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue, + redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue, + redisForDeliverQueue: config.redisForDeliverQueue ? convertRedisOptions(config.redisForDeliverQueue, host) : redisForJobQueue, + redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue, + redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue, + redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue, + redisForObjectStorageQueue: config.redisForObjectStorageQueue ? convertRedisOptions(config.redisForObjectStorageQueue, host) : redisForJobQueue, + redisForWebhookDeliverQueue: config.redisForWebhookDeliverQueue ? convertRedisOptions(config.redisForWebhookDeliverQueue, host) : redisForJobQueue, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, id: config.id, proxy: config.proxy, diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 4444dc9787..8657d7f540 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -23,49 +23,49 @@ export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', - useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config.redisForSystemQueue, QUEUE.SYSTEM)), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', - useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION)), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, QUEUE.DELIVER)), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, QUEUE.INBOX)), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', - useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config.redisForDbQueue, QUEUE.DB)), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, QUEUE.RELATIONSHIP)), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', - useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE)), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', - useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER)), inject: [DI.config], }; diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index 5edc0f45ab..0efec9627b 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -43,8 +43,8 @@ export class QueueStatsService implements OnApplicationShutdown { let activeDeliverJobs = 0; let activeInboxJobs = 0; - const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); - const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); + const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER)); + const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config.redisForInboxQueue, QUEUE.INBOX)); deliverQueueEvents.on('active', () => { activeDeliverJobs++; diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index b5298aa898..9df372b3ea 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -146,7 +146,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for system`); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM), + ...baseQueueOptions(this.config.redisForSystemQueue, QUEUE.SYSTEM), autorun: false, }); @@ -185,7 +185,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for db`); } }, { - ...baseQueueOptions(this.config, QUEUE.DB), + ...baseQueueOptions(this.config.redisForDbQueue, QUEUE.DB), autorun: false, }); @@ -201,7 +201,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region deliver this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.DELIVER), + ...baseQueueOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -225,7 +225,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region inbox this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.INBOX), + ...baseQueueOptions(this.config.redisForInboxQueue, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -249,7 +249,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region webhook deliver this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + ...baseQueueOptions(this.config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -281,7 +281,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for relationship`); } }, { - ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + ...baseQueueOptions(this.config.redisForRelationshipQueue, QUEUE.RELATIONSHIP), autorun: false, concurrency: this.config.relashionshipJobConcurrency ?? 16, limiter: { @@ -308,7 +308,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); } }, { - ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + ...baseQueueOptions(this.config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE), autorun: false, concurrency: 16, }); @@ -325,7 +325,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region ended poll notification this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + ...baseQueueOptions(this.config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION), autorun: false, }); //#endregion diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index e6ff7a4b57..55869095c7 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -3,8 +3,9 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Config } from '@/config.js'; import type * as Bull from 'bullmq'; +import type { RedisOptions } from "ioredis"; +import type { RedisOptionsSource } from '@/config.js'; export const QUEUE = { DELIVER: 'deliver', @@ -17,13 +18,13 @@ export const QUEUE = { WEBHOOK_DELIVER: 'webhookDeliver', }; -export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { +export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { return { connection: { - ...config.redisForJobQueue, + ...config, maxRetriesPerRequest: null, keyPrefix: undefined, }, - prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`, + prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, }; }