diff --git a/packages/backend/package.json b/packages/backend/package.json index 39282efa08..f88101a32d 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -79,7 +79,7 @@ "autwh": "0.1.0", "bcryptjs": "2.4.3", "blurhash": "2.0.5", - "bull": "4.10.4", + "bullmq": "3.15.0", "cacheable-lookup": "6.1.0", "cbor": "9.0.0", "chalk": "5.2.0", diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 2c7c78f950..1c8491bf57 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -510,7 +510,7 @@ export class NoteCreateService implements OnApplicationShutdown { if (data.poll && data.poll.expiresAt) { const delay = data.poll.expiresAt.getTime() - Date.now(); - this.queueService.endedPollNotificationQueue.add({ + this.queueService.endedPollNotificationQueue.add(note.id, { noteId: note.id, }, { delay, diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index bacc59aae2..3384ca4577 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -1,42 +1,11 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; -import Bull from 'bull'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; +import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { Provider } from '@nestjs/common'; -import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js'; - -function q(config: Config, name: string, limitPerSec = -1) { - return new Bull(name, { - redis: { - port: config.redisForJobQueue.port, - host: config.redisForJobQueue.host, - family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family, - password: config.redisForJobQueue.pass, - db: config.redisForJobQueue.db ?? 0, - }, - prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue', - limiter: limitPerSec > 0 ? { - max: limitPerSec, - duration: 1000, - } : undefined, - settings: { - backoffStrategies: { - apBackoff, - }, - }, - }); -} - -// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 -function apBackoff(attemptsMade: number, err: Error) { - const baseDelay = 60 * 1000; // 1min - const maxBackoff = 8 * 60 * 60 * 1000; // 8hours - let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; - backoff = Math.min(backoff, maxBackoff); - backoff += Math.round(backoff * Math.random() * 0.2); - return backoff; -} +import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js'; export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; @@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', - useFactory: (config: Config) => q(config, 'system'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', - useFactory: (config: Config) => q(config, 'endedPollNotification'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16), + useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', - useFactory: (config: Config) => q(config, 'db'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', - useFactory: (config: Config) => q(config, 'objectStorage'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', - useFactory: (config: Config) => q(config, 'webhookDeliver', 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)), inject: [DI.config], }; diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 13d9d71233..c2c88270fd 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -1,6 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; import { v4 as uuid } from 'uuid'; -import Bull from 'bull'; import type { IActivity } from '@/core/activitypub/type.js'; import type { DriveFile } from '@/models/entities/DriveFile.js'; import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js'; @@ -11,6 +10,7 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js'; import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; import type httpSignature from '@peertube/http-signature'; +import type * as Bull from 'bullmq'; @Injectable() export class QueueService { @@ -26,7 +26,43 @@ export class QueueService { @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, - ) {} + ) { + this.systemQueue.add('tickCharts', { + }, { + repeat: { pattern: '55 * * * *' }, + removeOnComplete: true, + }); + + this.systemQueue.add('resyncCharts', { + }, { + repeat: { pattern: '0 0 * * *' }, + removeOnComplete: true, + }); + + this.systemQueue.add('cleanCharts', { + }, { + repeat: { pattern: '0 0 * * *' }, + removeOnComplete: true, + }); + + this.systemQueue.add('aggregateRetention', { + }, { + repeat: { pattern: '0 0 * * *' }, + removeOnComplete: true, + }); + + this.systemQueue.add('clean', { + }, { + repeat: { pattern: '0 0 * * *' }, + removeOnComplete: true, + }); + + this.systemQueue.add('checkExpiredMutings', { + }, { + repeat: { pattern: '*/5 * * * *' }, + removeOnComplete: true, + }); + } @bindThis public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) { @@ -42,11 +78,10 @@ export class QueueService { isSharedInbox, }; - return this.deliverQueue.add(data, { + return this.deliverQueue.add(to, data, { attempts: this.config.deliverJobMaxAttempts ?? 12, - timeout: 1 * 60 * 1000, // 1min backoff: { - type: 'apBackoff', + type: 'custom', }, removeOnComplete: true, removeOnFail: true, @@ -75,6 +110,7 @@ export class QueueService { }; await this.deliverQueue.addBulk(Array.from(inboxes.entries()).map(d => ({ + name: d[0], data: { user, content, @@ -94,11 +130,10 @@ export class QueueService { signature, }; - return this.inboxQueue.add(data, { + return this.inboxQueue.add('', data, { attempts: this.config.inboxJobMaxAttempts ?? 8, - timeout: 5 * 60 * 1000, // 5min backoff: { - type: 'apBackoff', + type: 'custom', }, removeOnComplete: true, removeOnFail: true, @@ -246,7 +281,7 @@ export class QueueService { private generateToDbJobData>(name: T, data: D): { name: string, data: D, - opts: Bull.JobOptions, + opts: Bull.JobsOptions, } { return { name, @@ -333,10 +368,10 @@ export class QueueService { } @bindThis - private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobOptions = {}): { + private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobsOptions = {}): { name: string, data: RelationshipJobData, - opts: Bull.JobOptions, + opts: Bull.JobsOptions, } { return { name, @@ -385,11 +420,10 @@ export class QueueService { eventId: uuid(), }; - return this.webhookDeliverQueue.add(data, { + return this.webhookDeliverQueue.add(webhook.id, data, { attempts: 4, - timeout: 1 * 60 * 1000, // 1min backoff: { - type: 'apBackoff', + type: 'custom', }, removeOnComplete: true, removeOnFail: true, @@ -401,11 +435,11 @@ export class QueueService { this.deliverQueue.once('cleaned', (jobs, status) => { //deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - this.deliverQueue.clean(0, 'delayed'); + this.deliverQueue.clean(0, Infinity, 'delayed'); this.inboxQueue.once('cleaned', (jobs, status) => { //inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - this.inboxQueue.clean(0, 'delayed'); + this.inboxQueue.clean(0, Infinity, 'delayed'); } } diff --git a/packages/backend/src/core/activitypub/models/ApNoteService.ts b/packages/backend/src/core/activitypub/models/ApNoteService.ts index 7f09ae7d33..76757f530a 100644 --- a/packages/backend/src/core/activitypub/models/ApNoteService.ts +++ b/packages/backend/src/core/activitypub/models/ApNoteService.ts @@ -18,6 +18,7 @@ import { PollService } from '@/core/PollService.js'; import { StatusError } from '@/misc/status-error.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; +import { checkHttps } from '@/misc/check-https.js'; import { getOneApId, getApId, getOneApHrefNullable, validPost, isEmoji, getApType } from '../type.js'; // eslint-disable-next-line @typescript-eslint/consistent-type-imports import { ApLoggerService } from '../ApLoggerService.js'; @@ -32,7 +33,6 @@ import { ApQuestionService } from './ApQuestionService.js'; import { ApImageService } from './ApImageService.js'; import type { Resolver } from '../ApResolverService.js'; import type { IObject, IPost } from '../type.js'; -import { checkHttps } from '@/misc/check-https.js'; @Injectable() export class ApNoteService { diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index f6f26db1fb..53a0d14cd7 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -1,7 +1,11 @@ -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import Xev from 'xev'; +import * as Bull from 'bullmq'; import { QueueService } from '@/core/QueueService.js'; import { bindThis } from '@/decorators.js'; +import { DI } from '@/di-symbols.js'; +import type { Config } from '@/config.js'; +import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { OnApplicationShutdown } from '@nestjs/common'; const ev = new Xev(); @@ -13,6 +17,9 @@ export class QueueStatsService implements OnApplicationShutdown { private intervalId: NodeJS.Timer; constructor( + @Inject(DI.config) + private config: Config, + private queueService: QueueService, ) { } @@ -31,11 +38,14 @@ export class QueueStatsService implements OnApplicationShutdown { let activeDeliverJobs = 0; let activeInboxJobs = 0; - this.queueService.deliverQueue.on('global:active', () => { + const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); + const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); + + deliverQueueEvents.on('active', () => { activeDeliverJobs++; }); - this.queueService.inboxQueue.on('global:active', () => { + inboxQueueEvents.on('active', () => { activeInboxJobs++; }); diff --git a/packages/backend/src/misc/prelude/time.ts b/packages/backend/src/misc/prelude/time.ts index 0da1f79139..b21978b186 100644 --- a/packages/backend/src/misc/prelude/time.ts +++ b/packages/backend/src/misc/prelude/time.ts @@ -5,13 +5,14 @@ const dateTimeIntervals = { }; export function dateUTC(time: number[]): Date { - const d = time.length === 2 ? Date.UTC(time[0], time[1]) - : time.length === 3 ? Date.UTC(time[0], time[1], time[2]) - : time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3]) - : time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4]) - : time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5]) - : time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6]) - : null; + const d = + time.length === 2 ? Date.UTC(time[0], time[1]) + : time.length === 3 ? Date.UTC(time[0], time[1], time[2]) + : time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3]) + : time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4]) + : time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5]) + : time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6]) + : null; if (!d) throw new Error('wrong number of arguments'); diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 566d855308..42f9c1af7d 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -1,10 +1,9 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import * as Bull from 'bullmq'; import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; -import { QueueService } from '@/core/QueueService.js'; import { bindThis } from '@/decorators.js'; -import { getJobInfo } from './get-job-info.js'; import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; @@ -35,17 +34,51 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; +import { QUEUE, baseQueueOptions } from './const.js'; + +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function httpRelatedBackoff(attemptsMade: number) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} + +function getJobInfo(job: Bull.Job | undefined, increment = false): string { + if (job == null) return '-'; + + const age = Date.now() - job.timestamp; + + const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` + : age > 10000 ? `${Math.floor(age / 1000)}s` + : `${age}ms`; + + // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする + const currentAttempts = job.attemptsMade + (increment ? 1 : 0); + const maxAttempts = job.opts ? job.opts.attempts : 0; + + return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; +} @Injectable() export class QueueProcessorService implements OnApplicationShutdown { private logger: Logger; + private systemQueueWorker: Bull.Worker; + private dbQueueWorker: Bull.Worker; + private deliverQueueWorker: Bull.Worker; + private inboxQueueWorker: Bull.Worker; + private webhookDeliverQueueWorker: Bull.Worker; + private relationshipQueueWorker: Bull.Worker; + private objectStorageQueueWorker: Bull.Worker; + private endedPollNotificationQueueWorker: Bull.Worker; constructor( @Inject(DI.config) private config: Config, private queueLoggerService: QueueLoggerService, - private queueService: QueueService, private webhookDeliverProcessorService: WebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, private deliverProcessorService: DeliverProcessorService, @@ -77,10 +110,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private cleanProcessorService: CleanProcessorService, ) { this.logger = this.queueLoggerService.logger; - } - @bindThis - public start(): void { function renderError(e: Error): any { if (e) { // 何故かeがundefinedで来ることがある return { @@ -97,160 +127,227 @@ export class QueueProcessorService implements OnApplicationShutdown { } } - const systemLogger = this.logger.createSubLogger('system'); - const deliverLogger = this.logger.createSubLogger('deliver'); - const webhookLogger = this.logger.createSubLogger('webhook'); - const inboxLogger = this.logger.createSubLogger('inbox'); - const dbLogger = this.logger.createSubLogger('db'); - const relationshipLogger = this.logger.createSubLogger('relationship'); - const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + //#region system + this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => { + switch (job.name) { + case 'tickCharts': return this.tickChartsProcessorService.process(); + case 'resyncCharts': return this.resyncChartsProcessorService.process(); + case 'cleanCharts': return this.cleanChartsProcessorService.process(); + case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); + case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); + case 'clean': return this.cleanProcessorService.process(); + default: throw new Error(`unrecognized job type ${job.name} for system`); + } + }, { + ...baseQueueOptions(this.config, QUEUE.SYSTEM), + autorun: false, + }); - this.queueService.systemQueue - .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) + const systemLogger = this.logger.createSubLogger('system'); + + this.systemQueueWorker .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`)); + .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => systemLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.deliverQueue - .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) - .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + //#region db + this.dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => { + switch (job.name) { + case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job); + case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job); + case 'exportNotes': return this.exportNotesProcessorService.process(job); + case 'exportFavorites': return this.exportFavoritesProcessorService.process(job); + case 'exportFollowing': return this.exportFollowingProcessorService.process(job); + case 'exportMuting': return this.exportMutingProcessorService.process(job); + case 'exportBlocking': return this.exportBlockingProcessorService.process(job); + case 'exportUserLists': return this.exportUserListsProcessorService.process(job); + case 'exportAntennas': return this.exportAntennasProcessorService.process(job); + case 'importFollowing': return this.importFollowingProcessorService.process(job); + case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job); + case 'importMuting': return this.importMutingProcessorService.process(job); + case 'importBlocking': return this.importBlockingProcessorService.process(job); + case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job); + case 'importUserLists': return this.importUserListsProcessorService.process(job); + case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job); + case 'importAntennas': return this.importAntennasProcessorService.process(job); + case 'deleteAccount': return this.deleteAccountProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for db`); + } + }, { + ...baseQueueOptions(this.config, QUEUE.DB), + autorun: false, + }); - this.queueService.inboxQueue - .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) - .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); + const dbLogger = this.logger.createSubLogger('db'); - this.queueService.dbQueue - .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`)) + this.dbQueueWorker .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`)); + .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => dbLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.relationshipQueue - .on('waiting', (jobId) => relationshipLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => relationshipLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => relationshipLogger.warn(`stalled id=${job.id}`)); + //#region deliver + this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.DELIVER), + autorun: false, + concurrency: this.config.deliverJobConcurrency ?? 128, + limiter: { + max: this.config.deliverJobPerSec ?? 128, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); - this.queueService.objectStorageQueue - .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); + const deliverLogger = this.logger.createSubLogger('deliver'); - this.queueService.webhookDeliverQueue - .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`)) + this.deliverQueueWorker + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => deliverLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + //#endregion + + //#region inbox + this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.INBOX), + autorun: false, + concurrency: this.config.inboxJobConcurrency ?? 16, + limiter: { + max: this.config.inboxJobPerSec ?? 16, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); + + const inboxLogger = this.logger.createSubLogger('inbox'); + + this.inboxQueueWorker + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => inboxLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + //#endregion + + //#region webhook deliver + this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + autorun: false, + concurrency: 64, + limiter: { + max: 64, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); + + const webhookLogger = this.logger.createSubLogger('webhook'); + + this.webhookDeliverQueueWorker .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) - .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => webhookLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.systemQueue.add('tickCharts', { + //#region relationship + this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + switch (job.name) { + case 'follow': return this.relationshipProcessorService.processFollow(job); + case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); + case 'block': return this.relationshipProcessorService.processBlock(job); + case 'unblock': return this.relationshipProcessorService.processUnblock(job); + default: throw new Error(`unrecognized job type ${job.name} for relationship`); + } }, { - repeat: { cron: '55 * * * *' }, - removeOnComplete: true, + ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + autorun: false, + concurrency: this.config.relashionshipJobConcurrency ?? 16, + limiter: { + max: this.config.relashionshipJobPerSec ?? 64, + duration: 1000, + }, }); - this.queueService.systemQueue.add('resyncCharts', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); - - this.queueService.systemQueue.add('cleanCharts', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); - - this.queueService.systemQueue.add('aggregateRetention', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); - - this.queueService.systemQueue.add('clean', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); - - this.queueService.systemQueue.add('checkExpiredMutings', { - }, { - repeat: { cron: '*/5 * * * *' }, - removeOnComplete: true, - }); - - this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job)); - this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job)); - this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done)); - this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job)); - - this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done)); - this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job)); - this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job)); - this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done)); - this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done)); - this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done)); - this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job)); - - this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job)); - this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done)); + const relationshipLogger = this.logger.createSubLogger('relationship'); - { - const maxJobs = this.config.relashionshipJobConcurrency ?? 16; - this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job)); - this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job)); - this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job)); - this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job)); - } + this.relationshipQueueWorker + .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => relationshipLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done)); - this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done)); - this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done)); + //#region object storage + this.objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => { + switch (job.name) { + case 'deleteFile': return this.deleteFileProcessorService.process(job); + case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); + } + }, { + ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + autorun: false, + concurrency: 16, + }); + + const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + + this.objectStorageQueueWorker + .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => objectStorageLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + //#endregion + + //#region ended poll notification + this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + autorun: false, + }); + //#endregion + } + + @bindThis + public async start(): Promise { + await Promise.all([ + this.systemQueueWorker.run(), + this.dbQueueWorker.run(), + this.deliverQueueWorker.run(), + this.inboxQueueWorker.run(), + this.webhookDeliverQueueWorker.run(), + this.relationshipQueueWorker.run(), + this.objectStorageQueueWorker.run(), + this.endedPollNotificationQueueWorker.run(), + ]); } @bindThis public async stop(): Promise { await Promise.all([ - this.queueService.systemQueue.close(false), - this.queueService.endedPollNotificationQueue.close(false), - this.queueService.deliverQueue.close(false), - this.queueService.inboxQueue.close(false), - this.queueService.dbQueue.close(false), - this.queueService.relationshipQueue.close(false), - this.queueService.objectStorageQueue.close(false), - this.queueService.webhookDeliverQueue.close(false), + this.systemQueueWorker.close(), + this.dbQueueWorker.close(), + this.deliverQueueWorker.close(), + this.inboxQueueWorker.close(), + this.webhookDeliverQueueWorker.close(), + this.relationshipQueueWorker.close(), + this.objectStorageQueueWorker.close(), + this.endedPollNotificationQueueWorker.close(), ]); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts new file mode 100644 index 0000000000..d240fe70e0 --- /dev/null +++ b/packages/backend/src/queue/const.ts @@ -0,0 +1,26 @@ +import { Config } from '@/config.js'; +import type * as Bull from 'bullmq'; + +export const QUEUE = { + DELIVER: 'deliver', + INBOX: 'inbox', + SYSTEM: 'system', + ENDED_POLL_NOTIFICATION: 'endedPollNotification', + DB: 'db', + RELATIONSHIP: 'relationship', + OBJECT_STORAGE: 'objectStorage', + WEBHOOK_DELIVER: 'webhookDeliver', +}; + +export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { + return { + connection: { + port: config.redisForJobQueue.port, + host: config.redisForJobQueue.host, + family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family, + password: config.redisForJobQueue.pass, + db: config.redisForJobQueue.db ?? 0, + }, + prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`, + }; +} diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts deleted file mode 100644 index d33e349c36..0000000000 --- a/packages/backend/src/queue/get-job-info.ts +++ /dev/null @@ -1,15 +0,0 @@ -import Bull from 'bull'; - -export function getJobInfo(job: Bull.Job, increment = false) { - const age = Date.now() - job.timestamp; - - const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` - : age > 10000 ? `${Math.floor(age / 1000)}s` - : `${age}ms`; - - // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする - const currentAttempts = job.attemptsMade + (increment ? 1 : 0); - const maxAttempts = job.opts ? job.opts.attempts : 0; - - return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; -} diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts index e2720b4fe0..600ce0828f 100644 --- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts +++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts @@ -9,7 +9,7 @@ import { deepClone } from '@/misc/clone.js'; import { IdService } from '@/core/IdService.js'; import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class AggregateRetentionProcessorService { @@ -32,7 +32,7 @@ export class AggregateRetentionProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Aggregating retention...'); const now = new Date(); @@ -62,7 +62,6 @@ export class AggregateRetentionProcessorService { } catch (err) { if (isDuplicateKeyValueError(err)) { this.logger.succ('Skip because it has already been processed by another worker.'); - done(); return; } throw err; @@ -88,6 +87,5 @@ export class AggregateRetentionProcessorService { } this.logger.succ('Retention aggregated.'); - done(); } } diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts index 2476d71a5e..c4ee212bab 100644 --- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -7,7 +7,7 @@ import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { UserMutingService } from '@/core/UserMutingService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class CheckExpiredMutingsProcessorService { @@ -27,7 +27,7 @@ export class CheckExpiredMutingsProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Checking expired mutings...'); const expired = await this.mutingsRepository.createQueryBuilder('muting') @@ -41,6 +41,5 @@ export class CheckExpiredMutingsProcessorService { } this.logger.succ('All expired mutings checked.'); - done(); } } diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts index b458167042..22d7c1b4fb 100644 --- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -16,7 +16,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class CleanChartsProcessorService { @@ -45,7 +45,7 @@ export class CleanChartsProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Clean charts...'); await Promise.all([ @@ -64,6 +64,5 @@ export class CleanChartsProcessorService { ]); this.logger.succ('All charts successfully cleaned.'); - done(); } } diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts index 1936e8df23..cefa6da5e9 100644 --- a/packages/backend/src/queue/processors/CleanProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -7,7 +7,7 @@ import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class CleanProcessorService { @@ -36,7 +36,7 @@ export class CleanProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Cleaning...'); this.userIpsRepository.delete({ @@ -72,6 +72,5 @@ export class CleanProcessorService { } this.logger.succ('Cleaned.'); - done(); } } diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts index 5a33c27188..c54bf59ae4 100644 --- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -5,9 +5,9 @@ import type { DriveFilesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; @Injectable() export class CleanRemoteFilesProcessorService { @@ -27,7 +27,7 @@ export class CleanRemoteFilesProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(job: Bull.Job>): Promise { this.logger.info('Deleting cached remote files...'); let deletedCount = 0; @@ -47,7 +47,7 @@ export class CleanRemoteFilesProcessorService { }); if (files.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -62,10 +62,9 @@ export class CleanRemoteFilesProcessorService { isLink: false, }); - job.progress(deletedCount / total); + job.updateProgress(deletedCount / total); } this.logger.succ('All cached remote files has been deleted.'); - done(); } } diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index e36a78de6a..39dd801af0 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -8,10 +8,10 @@ import { DriveService } from '@/core/DriveService.js'; import type { DriveFile } from '@/models/entities/DriveFile.js'; import type { Note } from '@/models/entities/Note.js'; import { EmailService } from '@/core/EmailService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbUserDeleteJobData } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbUserDeleteJobData } from '../types.js'; @Injectable() export class DeleteAccountProcessorService { diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts index 604497cf54..6772c5dc76 100644 --- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts @@ -5,10 +5,10 @@ import type { UsersRepository, DriveFilesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbJobDataWithUser } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; @Injectable() export class DeleteDriveFilesProcessorService { @@ -31,12 +31,11 @@ export class DeleteDriveFilesProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Deleting drive files of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -56,7 +55,7 @@ export class DeleteDriveFilesProcessorService { }); if (files.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -71,10 +70,9 @@ export class DeleteDriveFilesProcessorService { userId: user.id, }); - job.progress(deletedCount / total); + job.updateProgress(deletedCount / total); } this.logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); - done(); } } diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts index 2fb2f56f8d..edf87bd921 100644 --- a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts @@ -3,10 +3,10 @@ import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { ObjectStorageFileJobData } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { ObjectStorageFileJobData } from '../types.js'; @Injectable() export class DeleteFileProcessorService { diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 380671302d..406e9df850 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -1,4 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { DriveFilesRepository, InstancesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; @@ -16,7 +17,6 @@ import { StatusError } from '@/misc/status-error.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; import type { DeliverJobData } from '../types.js'; @Injectable() @@ -121,11 +121,9 @@ export class DeliverProcessorService { isSuspended: true, }); }); - return `${host} is gone`; + throw new Bull.UnrecoverableError(`${host} is gone`); } - // HTTPステータスコード4xxはクライアントエラーであり、それはつまり - // 何回再送しても成功することはないということなのでエラーにはしないでおく - return `${res.statusCode} ${res.statusMessage}`; + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } // 5xx etc. diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts index 501ed4090a..21501592f2 100644 --- a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts +++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts @@ -6,7 +6,7 @@ import type Logger from '@/logger.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import type { EndedPollNotificationJobData } from '../types.js'; @Injectable() @@ -30,10 +30,9 @@ export class EndedPollNotificationProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { const note = await this.notesRepository.findOneBy({ id: job.data.noteId }); if (note == null || !note.hasPoll) { - done(); return; } @@ -51,7 +50,5 @@ export class EndedPollNotificationProcessorService { noteId: note.id, }); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts index 894903e79b..ac52325c8d 100644 --- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -12,7 +12,7 @@ import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { DBExportAntennasData } from '../types.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class ExportAntennasProcessorService { @@ -39,10 +39,9 @@ export class ExportAntennasProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } const [path, cleanup] = await createTemp(); @@ -96,7 +95,6 @@ export class ExportAntennasProcessorService { this.logger.succ('Exported to: ' + driveFile.id); } finally { cleanup(); - done(); } } } diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index c7b54070d6..eb758e162d 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -9,10 +9,10 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbJobDataWithUser } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; @Injectable() export class ExportBlockingProcessorService { @@ -36,12 +36,11 @@ export class ExportBlockingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting blocking of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -69,7 +68,7 @@ export class ExportBlockingProcessorService { }); if (blockings.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -99,7 +98,7 @@ export class ExportBlockingProcessorService { blockerId: user.id, }); - job.progress(exportedCount / total); + job.updateProgress(exportedCount / total); } stream.end(); @@ -112,7 +111,5 @@ export class ExportBlockingProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts index b50f373ef8..3203d9f3e5 100644 --- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -13,7 +13,7 @@ import { createTemp, createTempDir } from '@/misc/create-temp.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class ExportCustomEmojisProcessorService { @@ -37,12 +37,11 @@ export class ExportCustomEmojisProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info('Exporting custom emojis ...'); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -117,24 +116,26 @@ export class ExportCustomEmojisProcessorService { metaStream.end(); // Create archive - const [archivePath, archiveCleanup] = await createTemp(); - const archiveStream = fs.createWriteStream(archivePath); - const archive = archiver('zip', { - zlib: { level: 0 }, - }); - archiveStream.on('close', async () => { - this.logger.succ(`Exported to: ${archivePath}`); + await new Promise(async (resolve) => { + const [archivePath, archiveCleanup] = await createTemp(); + const archiveStream = fs.createWriteStream(archivePath); + const archive = archiver('zip', { + zlib: { level: 0 }, + }); + archiveStream.on('close', async () => { + this.logger.succ(`Exported to: ${archivePath}`); - const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; - const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); + const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; + const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); - this.logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - archiveCleanup(); - done(); + this.logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + archiveCleanup(); + resolve(); + }); + archive.pipe(archiveStream); + archive.directory(path, false); + archive.finalize(); }); - archive.pipe(archiveStream); - archive.directory(path, false); - archive.finalize(); } } diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index f2f2383a88..76c38a6b86 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -12,7 +12,7 @@ import type { Poll } from '@/models/entities/Poll.js'; import type { Note } from '@/models/entities/Note.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @Injectable() @@ -42,12 +42,11 @@ export class ExportFavoritesProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting favorites of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -91,7 +90,7 @@ export class ExportFavoritesProcessorService { }) as (NoteFavorite & { note: Note & { user: User } })[]; if (favorites.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -112,7 +111,7 @@ export class ExportFavoritesProcessorService { userId: user.id, }); - job.progress(exportedFavoritesCount / total); + job.updateProgress(exportedFavoritesCount / total); } await write(']'); @@ -127,8 +126,6 @@ export class ExportFavoritesProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index fa9c1ac1ea..8726cb1402 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -10,10 +10,10 @@ import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import type { Following } from '@/models/entities/Following.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbExportFollowingData } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbExportFollowingData } from '../types.js'; @Injectable() export class ExportFollowingProcessorService { @@ -40,12 +40,11 @@ export class ExportFollowingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting following of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -116,7 +115,5 @@ export class ExportFollowingProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index b14bf5f5b1..0f11a9e843 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -9,10 +9,10 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbJobDataWithUser } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; @Injectable() export class ExportMutingProcessorService { @@ -39,12 +39,11 @@ export class ExportMutingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting muting of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -73,7 +72,7 @@ export class ExportMutingProcessorService { }); if (mutes.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -103,7 +102,7 @@ export class ExportMutingProcessorService { muterId: user.id, }); - job.progress(exportedCount / total); + job.updateProgress(exportedCount / total); } stream.end(); @@ -116,7 +115,5 @@ export class ExportMutingProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index e4f12ad101..24fb331883 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -12,7 +12,7 @@ import type { Poll } from '@/models/entities/Poll.js'; import type { Note } from '@/models/entities/Note.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @Injectable() @@ -39,12 +39,11 @@ export class ExportNotesProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting notes of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -87,7 +86,7 @@ export class ExportNotesProcessorService { }) as Note[]; if (notes.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -108,7 +107,7 @@ export class ExportNotesProcessorService { userId: user.id, }); - job.progress(exportedNotesCount / total); + job.updateProgress(exportedNotesCount / total); } await write(']'); @@ -123,8 +122,6 @@ export class ExportNotesProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts index 54bde44044..ec63358053 100644 --- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -9,10 +9,10 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbJobDataWithUser } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; @Injectable() export class ExportUserListsProcessorService { @@ -39,12 +39,11 @@ export class ExportUserListsProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Exporting user lists of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -92,7 +91,5 @@ export class ExportUserListsProcessorService { } finally { cleanup(); } - - done(); } } diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index d06131b8c8..575cad69d5 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import { DBAntennaImportJobData } from '../types.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; const validate = new Ajv().compile({ type: 'object', @@ -59,7 +59,7 @@ export class ImportAntennasProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { const now = new Date(); try { for (const antenna of job.data.antenna) { @@ -89,8 +89,6 @@ export class ImportAntennasProcessorService { } } catch (err: any) { this.logger.error(err); - } finally { - done(); } } } diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts index c34ab9e36e..2f1a9e5b03 100644 --- a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts @@ -7,11 +7,11 @@ import * as Acct from '@/misc/acct.js'; import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js'; import { bindThis } from '@/decorators.js'; import { QueueService } from '@/core/QueueService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js'; @Injectable() export class ImportBlockingProcessorService { @@ -34,12 +34,11 @@ export class ImportBlockingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Importing blocking of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -47,7 +46,6 @@ export class ImportBlockingProcessorService { id: job.data.fileId, }); if (file == null) { - done(); return; } @@ -56,7 +54,6 @@ export class ImportBlockingProcessorService { this.queueService.createImportBlockingToDbJob({ id: user.id }, targets); this.logger.succ('Import jobs created'); - done(); } @bindThis diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 600468a286..d862567871 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -12,7 +12,7 @@ import { DriveService } from '@/core/DriveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; // TODO: 名前衝突時の動作を選べるようにする @@ -45,14 +45,13 @@ export class ImportCustomEmojisProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info('Importing custom emojis ...'); const file = await this.driveFilesRepository.findOneBy({ id: job.data.fileId, }); if (file == null) { - done(); return; } @@ -116,7 +115,6 @@ export class ImportCustomEmojisProcessorService { cleanup(); this.logger.succ('Imported'); - done(); }); unzipStream.pipe(extractor); this.logger.succ(`Unzipping to ${outputPath}`); diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts index 2f2e9ada99..15bee9672e 100644 --- a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts @@ -7,11 +7,11 @@ import * as Acct from '@/misc/acct.js'; import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js'; import { bindThis } from '@/decorators.js'; import { QueueService } from '@/core/QueueService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js'; @Injectable() export class ImportFollowingProcessorService { @@ -34,12 +34,11 @@ export class ImportFollowingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Importing following of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -47,7 +46,6 @@ export class ImportFollowingProcessorService { id: job.data.fileId, }); if (file == null) { - done(); return; } @@ -56,7 +54,6 @@ export class ImportFollowingProcessorService { this.queueService.createImportFollowingToDbJob({ id: user.id }, targets); this.logger.succ('Import jobs created'); - done(); } @bindThis diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts index d9ca096176..723935cd31 100644 --- a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts @@ -9,10 +9,10 @@ import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { UserMutingService } from '@/core/UserMutingService.js'; import { UtilityService } from '@/core/UtilityService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; -import type { DbUserImportJobData } from '../types.js'; import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbUserImportJobData } from '../types.js'; @Injectable() export class ImportMutingProcessorService { @@ -38,12 +38,11 @@ export class ImportMutingProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Importing muting of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -51,7 +50,6 @@ export class ImportMutingProcessorService { id: job.data.fileId, }); if (file == null) { - done(); return; } @@ -98,6 +96,5 @@ export class ImportMutingProcessorService { } this.logger.succ('Imported'); - done(); } } diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts index c423863410..824ee8157a 100644 --- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts @@ -12,7 +12,7 @@ import { IdService } from '@/core/IdService.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @Injectable() @@ -46,12 +46,11 @@ export class ImportUserListsProcessorService { } @bindThis - public async process(job: Bull.Job, done: () => void): Promise { + public async process(job: Bull.Job): Promise { this.logger.info(`Importing user lists of ${job.data.user.id} ...`); const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); return; } @@ -59,7 +58,6 @@ export class ImportUserListsProcessorService { id: job.data.fileId, }); if (file == null) { - done(); return; } @@ -109,6 +107,5 @@ export class ImportUserListsProcessorService { } this.logger.succ('Imported'); - done(); } } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 535a806d7f..ce1d7aaa1b 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -1,8 +1,8 @@ import { URL } from 'node:url'; import { Inject, Injectable } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; -import type { InstancesRepository, DriveFilesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { MetaService } from '@/core/MetaService.js'; @@ -23,10 +23,8 @@ import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; import type { InboxJobData } from '../types.js'; -// ユーザーのinboxにアクティビティが届いた時の処理 @Injectable() export class InboxProcessorService { private logger: Logger; @@ -35,12 +33,6 @@ export class InboxProcessorService { @Inject(DI.config) private config: Config, - @Inject(DI.instancesRepository) - private instancesRepository: InstancesRepository, - - @Inject(DI.driveFilesRepository) - private driveFilesRepository: DriveFilesRepository, - private utilityService: UtilityService, private metaService: MetaService, private apInboxService: ApInboxService, @@ -93,10 +85,10 @@ export class InboxProcessorService { try { authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor)); } catch (err) { - // 対象が4xxならスキップ + // 対象が4xxならスキップ if (err instanceof StatusError) { if (err.isClientError) { - return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`; + throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`); } throw new Error(`Error in actor ${activity.actor} - ${err.statusCode ?? err}`); } @@ -105,12 +97,12 @@ export class InboxProcessorService { // それでもわからなければ終了 if (authUser == null) { - return 'skip: failed to resolve user'; + throw new Bull.UnrecoverableError('skip: failed to resolve user'); } // publicKey がなくても終了 if (authUser.key == null) { - return 'skip: failed to resolve user publicKey'; + throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey'); } // HTTP-Signatureの検証 @@ -118,10 +110,10 @@ export class InboxProcessorService { // また、signatureのsignerは、activity.actorと一致する必要がある if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { - // 一致しなくても、でもLD-Signatureがありそうならそっちも見る + // 一致しなくても、でもLD-Signatureがありそうならそっちも見る if (activity.signature) { if (activity.signature.type !== 'RsaSignature2017') { - return `skip: unsupported LD-signature type ${activity.signature.type}`; + throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`); } // activity.signature.creator: https://example.oom/users/user#main-key @@ -134,32 +126,32 @@ export class InboxProcessorService { // keyIdからLD-Signatureのユーザーを取得 authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator); if (authUser == null) { - return 'skip: LD-Signatureのユーザーが取得できませんでした'; + throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした'); } if (authUser.key == null) { - return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'; + throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'); } // LD-Signature検証 const ldSignature = this.ldSignatureService.use(); const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); if (!verified) { - return 'skip: LD-Signatureの検証に失敗しました'; + throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました'); } // もう一度actorチェック if (authUser.user.uri !== activity.actor) { - return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; + throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); } // ブロックしてたら中断 const ldHost = this.utilityService.extractDbHost(authUser.user.uri); if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) { - return `Blocked request: ${ldHost}`; + throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`); } } else { - return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; + throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`); } } @@ -168,7 +160,7 @@ export class InboxProcessorService { const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); const activityIdHost = this.utilityService.extractDbHost(activity.id); if (signerHost !== activityIdHost) { - return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; + throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); } } diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts index ff454df455..722260d948 100644 --- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts +++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts @@ -1,5 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; import { UserFollowingService } from '@/core/UserFollowingService.js'; import { UserBlockingService } from '@/core/UserBlockingService.js'; diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts index e5840f3da8..eab8e1e68d 100644 --- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -15,7 +15,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class ResyncChartsProcessorService { @@ -43,7 +43,7 @@ export class ResyncChartsProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Resync charts...'); // TODO: ユーザーごとのチャートも更新する @@ -55,6 +55,5 @@ export class ResyncChartsProcessorService { ]); this.logger.succ('All charts successfully resynced.'); - done(); } } diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts index 7ff84c15a5..f1696bf567 100644 --- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -16,7 +16,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; +import type * as Bull from 'bullmq'; @Injectable() export class TickChartsProcessorService { @@ -45,7 +45,7 @@ export class TickChartsProcessorService { } @bindThis - public async process(job: Bull.Job>, done: () => void): Promise { + public async process(): Promise { this.logger.info('Tick charts...'); await Promise.all([ @@ -64,6 +64,5 @@ export class TickChartsProcessorService { ]); this.logger.succ('All charts successfully ticked.'); - done(); } } diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts index a96301f5be..8b40c16749 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts @@ -1,4 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { WebhooksRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; @@ -7,7 +8,6 @@ import { HttpRequestService } from '@/core/HttpRequestService.js'; import { StatusError } from '@/misc/status-error.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; import type { WebhookDeliverJobData } from '../types.js'; @Injectable() @@ -66,7 +66,7 @@ export class WebhookDeliverProcessorService { if (res instanceof StatusError) { // 4xx if (res.isClientError) { - return `${res.statusCode} ${res.statusMessage}`; + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } // 5xx etc. diff --git a/packages/frontend/src/scripts/time.ts b/packages/frontend/src/scripts/time.ts index 0da1f79139..b21978b186 100644 --- a/packages/frontend/src/scripts/time.ts +++ b/packages/frontend/src/scripts/time.ts @@ -5,13 +5,14 @@ const dateTimeIntervals = { }; export function dateUTC(time: number[]): Date { - const d = time.length === 2 ? Date.UTC(time[0], time[1]) - : time.length === 3 ? Date.UTC(time[0], time[1], time[2]) - : time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3]) - : time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4]) - : time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5]) - : time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6]) - : null; + const d = + time.length === 2 ? Date.UTC(time[0], time[1]) + : time.length === 3 ? Date.UTC(time[0], time[1], time[2]) + : time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3]) + : time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4]) + : time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5]) + : time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6]) + : null; if (!d) throw new Error('wrong number of arguments'); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0ed97ba293..9151b5cd93 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -152,9 +152,9 @@ importers: blurhash: specifier: 2.0.5 version: 2.0.5 - bull: - specifier: 4.10.4 - version: 4.10.4 + bullmq: + specifier: 3.15.0 + version: 3.15.0 cacheable-lookup: specifier: 6.1.0 version: 6.1.0 @@ -2116,7 +2116,7 @@ packages: '@babel/traverse': 7.22.4 '@babel/types': 7.22.4 convert-source-map: 1.9.0 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.0 @@ -2139,7 +2139,7 @@ packages: '@babel/traverse': 7.22.4 '@babel/types': 7.22.4 convert-source-map: 1.9.0 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.0 @@ -4325,7 +4325,7 @@ packages: '@babel/helper-split-export-declaration': 7.18.6 '@babel/parser': 7.22.4 '@babel/types': 7.22.4 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -4754,7 +4754,7 @@ packages: engines: {node: ^12.22.0 || ^14.17.0 || >=16.0.0} dependencies: ajv: 6.12.6 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) espree: 9.5.2 globals: 13.19.0 ignore: 5.2.4 @@ -4919,7 +4919,7 @@ packages: engines: {node: '>=10.10.0'} dependencies: '@humanwhocodes/object-schema': 1.2.1 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) minimatch: 3.1.2 transitivePeerDependencies: - supports-color @@ -5345,48 +5345,48 @@ packages: os-filter-obj: 2.0.0 dev: false - /@msgpackr-extract/msgpackr-extract-darwin-arm64@2.2.0: - resolution: {integrity: sha512-Z9LFPzfoJi4mflGWV+rv7o7ZbMU5oAU9VmzCgL240KnqDW65Y2HFCT3MW06/ITJSnbVLacmcEJA8phywK7JinQ==} + /@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.2: + resolution: {integrity: sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==} cpu: [arm64] os: [darwin] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-darwin-x64@2.2.0: - resolution: {integrity: sha512-vq0tT8sjZsy4JdSqmadWVw6f66UXqUCabLmUVHZwUFzMgtgoIIQjT4VVRHKvlof3P/dMCkbMJ5hB1oJ9OWHaaw==} + /@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.2: + resolution: {integrity: sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw==} cpu: [x64] os: [darwin] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-arm64@2.2.0: - resolution: {integrity: sha512-hlxxLdRmPyq16QCutUtP8Tm6RDWcyaLsRssaHROatgnkOxdleMTgetf9JsdncL8vLh7FVy/RN9i3XR5dnb9cRA==} + /@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.2: + resolution: {integrity: sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg==} cpu: [arm64] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-arm@2.2.0: - resolution: {integrity: sha512-SaJ3Qq4lX9Syd2xEo9u3qPxi/OB+5JO/ngJKK97XDpa1C587H9EWYO6KD8995DAjSinWvdHKRrCOXVUC5fvGOg==} + /@msgpackr-extract/msgpackr-extract-linux-arm@3.0.2: + resolution: {integrity: sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA==} cpu: [arm] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-linux-x64@2.2.0: - resolution: {integrity: sha512-94y5PJrSOqUNcFKmOl7z319FelCLAE0rz/jPCWS+UtdMZvpa4jrQd+cJPQCLp2Fes1yAW/YUQj/Di6YVT3c3Iw==} + /@msgpackr-extract/msgpackr-extract-linux-x64@3.0.2: + resolution: {integrity: sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA==} cpu: [x64] os: [linux] requiresBuild: true dev: false optional: true - /@msgpackr-extract/msgpackr-extract-win32-x64@2.2.0: - resolution: {integrity: sha512-XrC0JzsqQSvOyM3t04FMLO6z5gCuhPE6k4FXuLK5xf52ZbdvcFe1yBmo7meCew9B8G2f0T9iu9t3kfTYRYROgA==} + /@msgpackr-extract/msgpackr-extract-win32-x64@3.0.2: + resolution: {integrity: sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ==} cpu: [x64] os: [win32] requiresBuild: true @@ -8159,7 +8159,7 @@ packages: '@typescript-eslint/scope-manager': 5.59.8 '@typescript-eslint/type-utils': 5.59.8(eslint@8.41.0)(typescript@5.1.3) '@typescript-eslint/utils': 5.59.8(eslint@8.41.0)(typescript@5.1.3) - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) eslint: 8.41.0 grapheme-splitter: 1.0.4 ignore: 5.2.4 @@ -8204,7 +8204,7 @@ packages: '@typescript-eslint/scope-manager': 5.59.8 '@typescript-eslint/types': 5.59.8 '@typescript-eslint/typescript-estree': 5.59.8(typescript@5.1.3) - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) eslint: 8.41.0 typescript: 5.1.3 transitivePeerDependencies: @@ -8259,7 +8259,7 @@ packages: dependencies: '@typescript-eslint/typescript-estree': 5.59.8(typescript@5.1.3) '@typescript-eslint/utils': 5.59.8(eslint@8.41.0)(typescript@5.1.3) - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) eslint: 8.41.0 tsutils: 3.21.0(typescript@5.1.3) typescript: 5.1.3 @@ -8309,7 +8309,7 @@ packages: dependencies: '@typescript-eslint/types': 5.59.8 '@typescript-eslint/visitor-keys': 5.59.8 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) globby: 11.1.0 is-glob: 4.0.3 semver: 7.5.1 @@ -8830,7 +8830,7 @@ packages: resolution: {integrity: sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==} engines: {node: '>= 6.0.0'} dependencies: - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) transitivePeerDependencies: - supports-color @@ -8838,7 +8838,7 @@ packages: resolution: {integrity: sha512-Zn4cw2NEqd+9fiSVWMscnjyQ1a8Yfoc5oBajLeo5w+YBHgDUcEBY2hS4YpTz6iN5f/2zQiktcuM6tS8x1p9dpA==} engines: {node: '>= 8.0.0'} dependencies: - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) depd: 1.1.2 humanize-ms: 1.2.1 transitivePeerDependencies: @@ -9324,7 +9324,7 @@ packages: resolution: {integrity: sha512-bbCQdg7bpEv6kGH41RO/3B2/GMMmJSo2iBK+X8AWN9mujtfUipMDfIjsgHCfpnKqoGEQrrmCDKSa5OQ19+fDmg==} dependencies: archy: 1.0.0 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) fastq: 1.15.0 transitivePeerDependencies: - supports-color @@ -9347,7 +9347,7 @@ packages: /axios@0.24.0: resolution: {integrity: sha512-Q6cWsys88HoPgAaFAVUb0WpPk0O8iTeisR9IMqy9G8AbO4NlpVknrnQS03zzF9PGAWgO3cgletO3VjV/P7VztA==} dependencies: - follow-redirects: 1.15.2 + follow-redirects: 1.15.2(debug@4.3.4) transitivePeerDependencies: - debug dev: false @@ -9858,18 +9858,17 @@ packages: dependencies: node-gyp-build: 4.6.0 - /bull@4.10.4: - resolution: {integrity: sha512-o9m/7HjS/Or3vqRd59evBlWCXd9Lp+ALppKseoSKHaykK46SmRjAilX98PgmOz1yeVaurt8D5UtvEt4bUjM3eA==} - engines: {node: '>=12'} + /bullmq@3.15.0: + resolution: {integrity: sha512-U0LSRjuoyIBpnE62T4maCWMYEt3qdBCa1lnlPxYKQmRF/Y+FQ9W6iW5JvNNN+NA5Jet7k0uX71a93EX1zGnrhw==} dependencies: - cron-parser: 4.7.1 - debuglog: 1.0.1 - get-port: 5.1.1 + cron-parser: 4.8.1 + glob: 8.1.0 ioredis: 5.3.2 lodash: 4.17.21 - msgpackr: 1.8.1 + msgpackr: 1.9.2 semver: 7.5.1 - uuid: 8.3.2 + tslib: 2.5.2 + uuid: 9.0.0 transitivePeerDependencies: - supports-color dev: false @@ -10759,11 +10758,11 @@ packages: readable-stream: 3.6.0 dev: false - /cron-parser@4.7.1: - resolution: {integrity: sha512-WguFaoQ0hQ61SgsCZLHUcNbAvlK0lypKXu62ARguefYmjzaOXIVRNrAmyXzabTwUn4sQvQLkk6bjH+ipGfw8bA==} + /cron-parser@4.8.1: + resolution: {integrity: sha512-jbokKWGcyU4gl6jAfX97E1gDpY12DJ1cLJZmoDzaAln/shZ+S3KBFBuA2Q6WeUN4gJf/8klnV1EfvhA2lK5IRQ==} engines: {node: '>=12.0.0'} dependencies: - luxon: 3.2.1 + luxon: 3.3.0 dev: false /cropperjs@2.0.0-beta.2: @@ -11016,16 +11015,6 @@ packages: dependencies: ms: 2.0.0 - /debug@3.2.7: - resolution: {integrity: sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==} - peerDependencies: - supports-color: '*' - peerDependenciesMeta: - supports-color: - optional: true - dependencies: - ms: 2.1.3 - /debug@3.2.7(supports-color@8.1.1): resolution: {integrity: sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==} peerDependencies: @@ -11036,18 +11025,6 @@ packages: dependencies: ms: 2.1.3 supports-color: 8.1.1 - dev: true - - /debug@4.3.4: - resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} - engines: {node: '>=6.0'} - peerDependencies: - supports-color: '*' - peerDependenciesMeta: - supports-color: - optional: true - dependencies: - ms: 2.1.2 /debug@4.3.4(supports-color@8.1.1): resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} @@ -11061,10 +11038,6 @@ packages: ms: 2.1.2 supports-color: 8.1.1 - /debuglog@1.0.1: - resolution: {integrity: sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw==} - dev: false - /decamelize-keys@1.1.1: resolution: {integrity: sha512-WiPxgEirIV0/eIOMcnFBA3/IJZAZqKnwAwWyvvdi4lsr1WCN22nhdf/3db3DoZcUjTV2SqfzIwNyp6y2xs3nmg==} engines: {node: '>=0.10.0'} @@ -11758,7 +11731,7 @@ packages: /eslint-import-resolver-node@0.3.7: resolution: {integrity: sha512-gozW2blMLJCeFpBwugLTGyvVjNoeo1knonXAcatC6bjPBZitotxdWf7Gimr25N4c0AAOo4eOUfaG82IJPDpqCA==} dependencies: - debug: 3.2.7 + debug: 3.2.7(supports-color@8.1.1) is-core-module: 2.11.0 resolve: 1.22.1 transitivePeerDependencies: @@ -11816,7 +11789,7 @@ packages: optional: true dependencies: '@typescript-eslint/parser': 5.59.8(eslint@8.41.0)(typescript@5.1.3) - debug: 3.2.7 + debug: 3.2.7(supports-color@8.1.1) eslint: 8.41.0 eslint-import-resolver-node: 0.3.7 transitivePeerDependencies: @@ -11870,7 +11843,7 @@ packages: array-includes: 3.1.6 array.prototype.flat: 1.3.1 array.prototype.flatmap: 1.3.1 - debug: 3.2.7 + debug: 3.2.7(supports-color@8.1.1) doctrine: 2.1.0 eslint: 8.41.0 eslint-import-resolver-node: 0.3.7 @@ -11996,7 +11969,7 @@ packages: ajv: 6.12.6 chalk: 4.1.2 cross-spawn: 7.0.3 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) doctrine: 3.0.0 escape-string-regexp: 4.0.0 eslint-scope: 7.2.0 @@ -12760,16 +12733,6 @@ packages: readable-stream: 2.3.7 dev: false - /follow-redirects@1.15.2: - resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==} - engines: {node: '>=4.0'} - peerDependencies: - debug: '*' - peerDependenciesMeta: - debug: - optional: true - dev: false - /follow-redirects@1.15.2(debug@4.3.4): resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==} engines: {node: '>=4.0'} @@ -12780,7 +12743,6 @@ packages: optional: true dependencies: debug: 4.3.4(supports-color@8.1.1) - dev: true /for-each@0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==} @@ -13050,6 +13012,7 @@ packages: /get-port@5.1.1: resolution: {integrity: sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==} engines: {node: '>=8'} + dev: true /get-stream@3.0.0: resolution: {integrity: sha512-GlhdIUuVakc8SJ6kK0zAFbiGzRFzNnY4jUuEbV9UROo4Y+0Ny4fjvcZFVTeDA4odpFyOQzaw6hXukJSq/f28sQ==} @@ -13656,7 +13619,7 @@ packages: dependencies: '@tootallnate/once': 2.0.0 agent-base: 6.0.2 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) transitivePeerDependencies: - supports-color dev: false @@ -13706,7 +13669,7 @@ packages: engines: {node: '>= 4.5.0'} dependencies: agent-base: 4.3.0 - debug: 3.2.7 + debug: 3.2.7(supports-color@8.1.1) transitivePeerDependencies: - supports-color dev: false @@ -13727,7 +13690,7 @@ packages: engines: {node: '>= 6'} dependencies: agent-base: 6.0.2 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) transitivePeerDependencies: - supports-color @@ -13892,7 +13855,7 @@ packages: dependencies: '@ioredis/commands': 1.2.0 cluster-key-slot: 1.1.2 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) denque: 2.1.0 lodash.defaults: 4.2.0 lodash.isarguments: 3.1.0 @@ -14474,7 +14437,7 @@ packages: resolution: {integrity: sha512-n3s8EwkdFIJCG3BPKBYvskgXGoy88ARzvegkitk60NxRdwltLOTaH7CUiMRXvwYorl0Q712iEjcWB+fK/MrWVw==} engines: {node: '>=10'} dependencies: - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) istanbul-lib-coverage: 3.2.0 source-map: 0.6.1 transitivePeerDependencies: @@ -15757,8 +15720,8 @@ packages: engines: {node: '>=16.14'} dev: true - /luxon@3.2.1: - resolution: {integrity: sha512-QrwPArQCNLAKGO/C+ZIilgIuDnEnKx5QYODdDtbFaxzsbZcc/a7WFq7MhsVYgRlwawLtvOUESTlfJ+hc/USqPg==} + /luxon@3.3.0: + resolution: {integrity: sha512-An0UCfG/rSiqtAIiBPO0Y9/zAnHUZxAMiCpTd5h2smgsj7GGmcenvrvww2cqNA8/4A5ZrD1gJpHN2mIHZQF+Mg==} engines: {node: '>=12'} dev: false @@ -16236,25 +16199,26 @@ packages: engines: {node: '>=12.13'} dev: false - /msgpackr-extract@2.2.0: - resolution: {integrity: sha512-0YcvWSv7ZOGl9Od6Y5iJ3XnPww8O7WLcpYMDwX+PAA/uXLDtyw94PJv9GLQV/nnp3cWlDhMoyKZIQLrx33sWog==} + /msgpackr-extract@3.0.2: + resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==} + hasBin: true requiresBuild: true dependencies: - node-gyp-build-optional-packages: 5.0.3 + node-gyp-build-optional-packages: 5.0.7 optionalDependencies: - '@msgpackr-extract/msgpackr-extract-darwin-arm64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-darwin-x64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-arm': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-arm64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-linux-x64': 2.2.0 - '@msgpackr-extract/msgpackr-extract-win32-x64': 2.2.0 + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.2 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2 dev: false optional: true - /msgpackr@1.8.1: - resolution: {integrity: sha512-05fT4J8ZqjYlR4QcRDIhLCYKUOHXk7C/xa62GzMKj74l3up9k2QZ3LgFc6qWdsPHl91QA2WLWqWc8b8t7GLNNw==} + /msgpackr@1.9.2: + resolution: {integrity: sha512-xtDgI3Xv0AAiZWLRGDchyzBwU6aq0rwJ+W+5Y4CZhEWtkl/hJtFFLc+3JtGTw7nz1yquxs7nL8q/yA2aqpflIQ==} optionalDependencies: - msgpackr-extract: 2.2.0 + msgpackr-extract: 3.0.2 dev: false /msw-storybook-addon@1.8.0(msw@1.2.1): @@ -16407,7 +16371,7 @@ packages: resolution: {integrity: sha512-6R9fqJ5Zcmf+uYaFgdIHmLwNldn5HbK8L5ybn7Uz+ylX/rnOsSp1AHcvQSrCaFN+qNM1wpymHqD7mVasEOlHGQ==} engines: {node: '>= 4.4.x'} dependencies: - debug: 3.2.7 + debug: 3.2.7(supports-color@8.1.1) iconv-lite: 0.4.24 sax: 1.2.4 transitivePeerDependencies: @@ -16507,8 +16471,9 @@ packages: fetch-blob: 3.2.0 formdata-polyfill: 4.0.10 - /node-gyp-build-optional-packages@5.0.3: - resolution: {integrity: sha512-k75jcVzk5wnnc/FMxsf4udAoTEUv2jY3ycfdSd3yWu6Cnd1oee6/CfZJApyscA4FJOmdoixWwiwOyf16RzD5JA==} + /node-gyp-build-optional-packages@5.0.7: + resolution: {integrity: sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w==} + hasBin: true dev: false optional: true @@ -19511,7 +19476,7 @@ packages: engines: {node: '>= 10'} dependencies: agent-base: 6.0.2 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) socks: 2.7.1 transitivePeerDependencies: - supports-color @@ -20633,7 +20598,7 @@ packages: chalk: 4.1.2 cli-highlight: 2.1.11 date-fns: 2.30.0 - debug: 4.3.4 + debug: 4.3.4(supports-color@8.1.1) dotenv: 16.0.3 glob: 8.1.0 ioredis: 5.3.2