|
|
|
@@ -9,6 +9,7 @@ import type { Config } from '@/config.js';
|
|
|
|
|
import { DI } from '@/di-symbols.js';
|
|
|
|
|
import type Logger from '@/logger.js';
|
|
|
|
|
import { bindThis } from '@/decorators.js';
|
|
|
|
|
import { envOption } from '@/env.js';
|
|
|
|
|
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
|
|
|
|
|
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
|
|
|
|
|
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
|
|
|
|
@@ -41,6 +42,7 @@ import { CleanProcessorService } from './processors/CleanProcessorService.js';
|
|
|
|
|
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
|
|
|
|
|
import { QueueLoggerService } from './QueueLoggerService.js';
|
|
|
|
|
import { QUEUE, baseQueueOptions } from './const.js';
|
|
|
|
|
import type { InboxJobData } from './types.js';
|
|
|
|
|
|
|
|
|
|
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
|
|
|
|
|
function httpRelatedBackoff(attemptsMade: number) {
|
|
|
|
@@ -68,13 +70,31 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string {
|
|
|
|
|
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function renderError(e: Error): any {
|
|
|
|
|
if (e) { // 何故かeがundefinedで来ることがある
|
|
|
|
|
return {
|
|
|
|
|
...Object.getOwnPropertyNames(e).reduce((acc, key) => {
|
|
|
|
|
//@ts-expect-error Element implicitly has an 'any' type because expression of type 'string' can't be used to index type 'Error'.
|
|
|
|
|
acc[key] = e[key];
|
|
|
|
|
return acc;
|
|
|
|
|
}, {} as Record<string, any>),
|
|
|
|
|
};
|
|
|
|
|
} else {
|
|
|
|
|
return {
|
|
|
|
|
stack: '?',
|
|
|
|
|
message: '?',
|
|
|
|
|
name: '?',
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Injectable()
|
|
|
|
|
export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
private logger: Logger;
|
|
|
|
|
private systemQueueWorker: Bull.Worker;
|
|
|
|
|
private dbQueueWorker: Bull.Worker;
|
|
|
|
|
private deliverQueueWorker: Bull.Worker;
|
|
|
|
|
private inboxQueueWorker: Bull.Worker;
|
|
|
|
|
private inboxQueueWorker: Bull.Worker<InboxJobData>;
|
|
|
|
|
private webhookDeliverQueueWorker: Bull.Worker;
|
|
|
|
|
private relationshipQueueWorker: Bull.Worker;
|
|
|
|
|
private objectStorageQueueWorker: Bull.Worker;
|
|
|
|
@@ -118,22 +138,6 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
) {
|
|
|
|
|
this.logger = this.queueLoggerService.logger;
|
|
|
|
|
|
|
|
|
|
function renderError(e: Error): any {
|
|
|
|
|
if (e) { // 何故かeがundefinedで来ることがある
|
|
|
|
|
return {
|
|
|
|
|
stack: e.stack,
|
|
|
|
|
message: e.message,
|
|
|
|
|
name: e.name,
|
|
|
|
|
};
|
|
|
|
|
} else {
|
|
|
|
|
return {
|
|
|
|
|
stack: '?',
|
|
|
|
|
message: '?',
|
|
|
|
|
name: '?',
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#region system
|
|
|
|
|
this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => {
|
|
|
|
|
switch (job.name) {
|
|
|
|
@@ -155,8 +159,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.systemQueueWorker
|
|
|
|
|
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
|
|
|
|
|
.on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
|
|
|
|
|
.on('failed', (job, err) => systemLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => systemLogger.warn(`failed(${err.message}) id=${job ? job.id : '-'}`, { err: renderError(err), data: job?.data }))
|
|
|
|
|
.on('error', (err: Error) => systemLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -194,8 +198,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.dbQueueWorker
|
|
|
|
|
.on('active', (job) => dbLogger.debug(`active id=${job.id}`))
|
|
|
|
|
.on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
|
|
|
|
|
.on('failed', (job, err) => dbLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => dbLogger.warn(`failed(${err.message}) id=${job ? job.id : '-'}`, { err: renderError(err), job }))
|
|
|
|
|
.on('error', (err: Error) => dbLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -218,8 +222,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.deliverQueueWorker
|
|
|
|
|
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
|
|
|
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
|
|
|
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
|
|
|
|
|
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.message}) ${getJobInfo(job)} id=${job ? job.id : '-'} to=${job?.data.to}`, { err: renderError(err), data: job?.data }))
|
|
|
|
|
.on('error', (err: Error) => deliverLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -242,8 +246,11 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.inboxQueueWorker
|
|
|
|
|
.on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
|
|
|
|
|
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
|
|
|
|
|
.on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => inboxLogger.warn(
|
|
|
|
|
`failed(${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`,
|
|
|
|
|
{ err: renderError(err), data: (job && !envOption.logJson) ? { activity: JSON.stringify(job.data.activity), signature: JSON.stringify(job.data.signature) } : null },
|
|
|
|
|
))
|
|
|
|
|
.on('error', (err: Error) => inboxLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -266,8 +273,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.webhookDeliverQueueWorker
|
|
|
|
|
.on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
|
|
|
.on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
|
|
|
.on('failed', (job, err) => webhookLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
|
|
|
|
|
.on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => webhookLogger.warn(`failed(${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`, { err: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => webhookLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -295,8 +302,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.relationshipQueueWorker
|
|
|
|
|
.on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
|
|
|
|
|
.on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
|
|
|
|
|
.on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => relationshipLogger.warn(`failed(${err.message}) id=${job ? job.id : '-'}`, { err: renderError(err), job }))
|
|
|
|
|
.on('error', (err: Error) => relationshipLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
@@ -318,8 +325,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|
|
|
|
this.objectStorageQueueWorker
|
|
|
|
|
.on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
|
|
|
|
|
.on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
|
|
|
|
|
.on('failed', (job, err) => objectStorageLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
|
|
|
|
|
.on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) }))
|
|
|
|
|
.on('failed', (job, err) => objectStorageLogger.warn(`failed(${err.message}) id=${job ? job.id : '-'}`, { err: renderError(err), job }))
|
|
|
|
|
.on('error', (err: Error) => objectStorageLogger.error('error', { err: renderError(err) }))
|
|
|
|
|
.on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`));
|
|
|
|
|
//#endregion
|
|
|
|
|
|
|
|
|
|