From 7b38806413b84bd20070f3c81b899e5b890b1a8b Mon Sep 17 00:00:00 2001 From: syuilo <4439005+syuilo@users.noreply.github.com> Date: Sat, 19 Apr 2025 14:00:38 +0900 Subject: [PATCH] feat: Job queue inspector (#15856) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip * wip * Update job-queue.vue * wip * wip * Update job-queue.vue * wip * Update job-queue.vue * wip * Update QueueService.ts * Update QueueService.ts * Update QueueService.ts * Update job-queue.vue * wip * wip * wip * Update job-queue.vue * wip * Update MkTl.vue * wip * Update index.vue * wip * wip * Update MkTl.vue * 🎨 * jobs search * wip * Update job-queue.vue * wip * wip * Update job-queue.vue * Update job-queue.vue * Update job-queue.vue * Update job-queue.vue * wip * Update job-queue.job.vue * wip * wip * wip * Update MkCode.vue * wip * Update job-queue.job.vue * wip * Update job-queue.job.vue * Update misskey-js.api.md * Update CHANGELOG.md * Update job-queue.job.vue --- CHANGELOG.md | 2 + locales/index.d.ts | 4 + locales/ja-JP.yml | 1 + packages/backend/package.json | 2 + .../backend/src/core/NoteCreateService.ts | 9 +- packages/backend/src/core/QueueService.ts | 470 +++++++++++++++--- .../src/queue/QueueProcessorService.ts | 20 +- packages/backend/src/queue/const.ts | 10 + .../backend/src/server/api/endpoint-list.ts | 8 +- .../server/api/endpoints/admin/queue/clear.ts | 8 +- .../server/api/endpoints/admin/queue/jobs.ts | 38 ++ .../api/endpoints/admin/queue/promote-jobs.ts | 39 ++ .../api/endpoints/admin/queue/promote.ts | 77 --- .../api/endpoints/admin/queue/queue-stats.ts | 36 ++ .../api/endpoints/admin/queue/queues.ts | 35 ++ .../api/endpoints/admin/queue/remove-job.ts | 38 ++ .../api/endpoints/admin/queue/retry-job.ts | 38 ++ .../api/endpoints/admin/queue/show-job.ts | 38 ++ packages/frontend/src/components/MkCode.vue | 8 +- packages/frontend/src/components/MkFolder.vue | 46 +- packages/frontend/src/components/MkTabs.vue | 235 +++++++++ packages/frontend/src/components/MkTl.vue | 173 +++++++ ...e => federation-job-queue.chart.chart.vue} | 0 ...art.vue => federation-job-queue.chart.vue} | 4 +- .../{queue.vue => federation-job-queue.vue} | 8 +- packages/frontend/src/pages/admin/index.vue | 13 +- .../src/pages/admin/job-queue.chart.vue | 127 +++++ .../src/pages/admin/job-queue.job.vue | 280 +++++++++++ .../frontend/src/pages/admin/job-queue.vue | 370 ++++++++++++++ packages/frontend/src/router.definition.ts | 10 +- packages/misskey-js/etc/misskey-js.api.md | 24 +- .../misskey-js/src/autogen/apiClientJSDoc.ts | 68 ++- packages/misskey-js/src/autogen/endpoint.ts | 15 +- packages/misskey-js/src/autogen/entities.ts | 7 +- packages/misskey-js/src/autogen/types.ts | 381 +++++++++++++- pnpm-lock.yaml | 18 + 36 files changed, 2448 insertions(+), 212 deletions(-) create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/jobs.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts delete mode 100644 packages/backend/src/server/api/endpoints/admin/queue/promote.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/queues.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/queue/show-job.ts create mode 100644 packages/frontend/src/components/MkTabs.vue create mode 100644 packages/frontend/src/components/MkTl.vue rename packages/frontend/src/pages/admin/{queue.chart.chart.vue => federation-job-queue.chart.chart.vue} (100%) rename packages/frontend/src/pages/admin/{queue.chart.vue => federation-job-queue.chart.vue} (97%) rename packages/frontend/src/pages/admin/{queue.vue => federation-job-queue.vue} (87%) create mode 100644 packages/frontend/src/pages/admin/job-queue.chart.vue create mode 100644 packages/frontend/src/pages/admin/job-queue.job.vue create mode 100644 packages/frontend/src/pages/admin/job-queue.vue diff --git a/CHANGELOG.md b/CHANGELOG.md index 87dc28d6c0..0524d3c5bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 2025.4.1 ### General +- Feat: bull-boardに代わるジョブキューの管理ツールが実装されました - Enhance: チャットの新規メッセージをプッシュ通知するように ### Client @@ -18,6 +19,7 @@ - Fix: アカウントの移行時にアンテナのフィルターのユーザが更新されない問題を修正 #15843 ### Server +- Enhance: ジョブキューの成功/失敗したジョブも一定数・一定期間保存するようにし、後から問題を調査することを容易に - Enhance: フォローしているユーザーならフォロワー限定投稿のノートでもアンテナで検知できるように (Cherry-picked from https://github.com/yojo-art/cherrypick/pull/568 and https://github.com/team-shahu/misskey/pull/38) - Fix: システムアカウントの名前がサーバー名と同期されない問題を修正 diff --git a/locales/index.d.ts b/locales/index.d.ts index 97e66be6d7..cb632dfb25 100644 --- a/locales/index.d.ts +++ b/locales/index.d.ts @@ -5398,6 +5398,10 @@ export interface Locale extends ILocale { * デッキへ戻る */ "goToDeck": string; + /** + * 連合ジョブ + */ + "federationJobs": string; "_chat": { /** * まだメッセージはありません diff --git a/locales/ja-JP.yml b/locales/ja-JP.yml index 1801c8e15b..fb2590d8fd 100644 --- a/locales/ja-JP.yml +++ b/locales/ja-JP.yml @@ -1345,6 +1345,7 @@ embed: "埋め込み" settingsMigrating: "設定を移行しています。しばらくお待ちください... (後ほど、設定→その他→旧設定情報を移行 で手動で移行することもできます)" readonly: "読み取り専用" goToDeck: "デッキへ戻る" +federationJobs: "連合ジョブ" _chat: noMessagesYet: "まだメッセージはありません" diff --git a/packages/backend/package.json b/packages/backend/package.json index 5e8529c422..e8b3d74aa7 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -93,6 +93,7 @@ "@swc/cli": "0.6.0", "@swc/core": "1.11.18", "@twemoji/parser": "15.1.1", + "@types/redis-info": "3.0.3", "accepts": "1.3.8", "ajv": "8.17.1", "archiver": "7.0.1", @@ -159,6 +160,7 @@ "random-seed": "0.3.0", "ratelimiter": "3.4.1", "re2": "1.21.4", + "redis-info": "3.1.0", "redis-lock": "0.1.4", "reflect-metadata": "0.2.2", "rename": "1.0.4", diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 1ddb2b173d..469426f87e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -576,7 +576,14 @@ export class NoteCreateService implements OnApplicationShutdown { noteId: note.id, }, { delay, - removeOnComplete: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index eb02355625..a1e806816b 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -5,6 +5,8 @@ import { randomUUID } from 'node:crypto'; import { Inject, Injectable } from '@nestjs/common'; +import { MetricsTime, type JobType } from 'bullmq'; +import { parse as parseRedisInfo } from 'redis-info'; import type { IActivity } from '@/core/activitypub/type.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js'; @@ -69,50 +71,58 @@ export class QueueService { this.systemQueue.add('tickCharts', { }, { repeat: { pattern: '55 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('resyncCharts', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('cleanCharts', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('aggregateRetention', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('clean', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('checkExpiredMutings', { }, { repeat: { pattern: '*/5 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('bakeBufferedReactions', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('checkModeratorsActivity', { }, { // 毎時30分に起動 repeat: { pattern: '30 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); } @@ -134,13 +144,21 @@ export class QueueService { isSharedInbox, }; - return this.deliverQueue.add(to, data, { + const label = to.replace('https://', '').replace('/inbox', ''); + + return this.deliverQueue.add(label, data, { attempts: this.config.deliverJobMaxAttempts ?? 12, backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -162,12 +180,18 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }; await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({ - name: d[0], + name: d[0].replace('https://', '').replace('/inbox', ''), data: { user, content: contentBody, @@ -188,13 +212,21 @@ export class QueueService { signature, }; - return this.inboxQueue.add('', data, { + const label = (activity.id ?? '').replace('https://', '').replace('/activity', ''); + + return this.inboxQueue.add(label, data, { attempts: this.config.inboxJobMaxAttempts ?? 8, backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -203,8 +235,14 @@ export class QueueService { return this.dbQueue.add('deleteDriveFiles', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -213,8 +251,14 @@ export class QueueService { return this.dbQueue.add('exportCustomEmojis', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -223,8 +267,14 @@ export class QueueService { return this.dbQueue.add('exportNotes', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -233,8 +283,14 @@ export class QueueService { return this.dbQueue.add('exportClips', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -243,8 +299,14 @@ export class QueueService { return this.dbQueue.add('exportFavorites', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -255,8 +317,14 @@ export class QueueService { excludeMuting, excludeInactive, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -265,8 +333,14 @@ export class QueueService { return this.dbQueue.add('exportMuting', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -275,8 +349,14 @@ export class QueueService { return this.dbQueue.add('exportBlocking', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -285,8 +365,14 @@ export class QueueService { return this.dbQueue.add('exportUserLists', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -295,8 +381,14 @@ export class QueueService { return this.dbQueue.add('exportAntennas', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -307,8 +399,14 @@ export class QueueService { fileId: fileId, withReplies, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -324,8 +422,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -335,8 +439,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -356,8 +466,14 @@ export class QueueService { name, data, opts: { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }, }; } @@ -368,8 +484,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -379,8 +501,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -390,8 +518,14 @@ export class QueueService { user: { id: user.id }, antenna, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -401,8 +535,14 @@ export class QueueService { user: { id: user.id }, soft: opts.soft, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -452,8 +592,14 @@ export class QueueService { withReplies: data.withReplies, }, opts: { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, ...opts, }, }; @@ -464,16 +610,28 @@ export class QueueService { return this.objectStorageQueue.add('deleteFile', { key: key, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @bindThis public createCleanRemoteFilesJob() { return this.objectStorageQueue.add('cleanRemoteFiles', {}, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -504,8 +662,14 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -535,13 +699,19 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @bindThis - private getQueue(type: typeof QUEUE_TYPES[number]) { + private getQueue(type: typeof QUEUE_TYPES[number]): Bull.Queue { switch (type) { case 'system': return this.systemQueue; case 'endedPollNotification': return this.endedPollNotificationQueue; @@ -557,19 +727,173 @@ export class QueueService { } @bindThis - public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') { + public async queueClear(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') { const queue = this.getQueue(queueType); if (state === '*') { - queue.clean(0, 0, 'completed'); - queue.clean(0, 0, 'wait'); - queue.clean(0, 0, 'active'); - queue.clean(0, 0, 'paused'); - queue.clean(0, 0, 'prioritized'); - queue.clean(0, 0, 'delayed'); - queue.clean(0, 0, 'failed'); + await Promise.all([ + queue.clean(0, 0, 'completed'), + queue.clean(0, 0, 'wait'), + queue.clean(0, 0, 'active'), + queue.clean(0, 0, 'paused'), + queue.clean(0, 0, 'prioritized'), + queue.clean(0, 0, 'delayed'), + queue.clean(0, 0, 'failed'), + ]); } else { - queue.clean(0, 0, state); + await queue.clean(0, 0, state); } } + + @bindThis + public async queuePromoteJobs(queueType: typeof QUEUE_TYPES[number]) { + const queue = this.getQueue(queueType); + await queue.promoteJobs(); + } + + @bindThis + public async queueRetryJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + if (job.finishedOn != null) { + await job.retry(); + } else { + await job.promote(); + } + } + } + + @bindThis + public async queueRemoveJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + await job.remove(); + } + } + + @bindThis + private packJobData(job: Bull.Job) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + const stacktrace = job.stacktrace ? job.stacktrace.filter(Boolean) : []; + stacktrace.reverse(); + + return { + id: job.id, + name: job.name, + data: job.data, + opts: job.opts, + timestamp: job.timestamp, + processedOn: job.processedOn, + processedBy: job.processedBy, + finishedOn: job.finishedOn, + progress: job.progress, + attempts: job.attemptsMade, + delay: job.delay, + failedReason: job.failedReason, + stacktrace: stacktrace, + returnValue: job.returnvalue, + isFailed: !!job.failedReason || (Array.isArray(stacktrace) && stacktrace.length > 0), + }; + } + + @bindThis + public async queueGetJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + return this.packJobData(job); + } else { + throw new Error(`Job not found: ${jobId}`); + } + } + + @bindThis + public async queueGetJobs(queueType: typeof QUEUE_TYPES[number], jobTypes: JobType[], search?: string) { + const RETURN_LIMIT = 100; + const queue = this.getQueue(queueType); + let jobs: Bull.Job[]; + + if (search) { + jobs = await queue.getJobs(jobTypes, 0, 1000); + + jobs = jobs.filter(job => { + const jobString = JSON.stringify(job).toLowerCase(); + return search.toLowerCase().split(' ').every(term => { + return jobString.includes(term); + }); + }); + + jobs = jobs.slice(0, RETURN_LIMIT); + } else { + jobs = await queue.getJobs(jobTypes, 0, RETURN_LIMIT); + } + + return jobs.map(job => this.packJobData(job)); + } + + @bindThis + public async queueGetQueues() { + const fetchings = QUEUE_TYPES.map(async type => { + const queue = this.getQueue(type); + + const counts = await queue.getJobCounts(); + const isPaused = await queue.isPaused(); + const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK); + const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK); + + return { + name: type, + counts: counts, + isPaused, + metrics: { + completed: metrics_completed, + failed: metrics_failed, + }, + }; + }); + + return await Promise.all(fetchings); + } + + @bindThis + public async queueGetQueue(queueType: typeof QUEUE_TYPES[number]) { + const queue = this.getQueue(queueType); + const counts = await queue.getJobCounts(); + const isPaused = await queue.isPaused(); + const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK); + const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK); + const db = parseRedisInfo(await (await queue.client).info()); + + return { + name: queueType, + qualifiedName: queue.qualifiedName, + counts: counts, + isPaused, + metrics: { + completed: metrics_completed, + failed: metrics_failed, + }, + db: { + version: db.redis_version, + mode: db.redis_mode, + runId: db.run_id, + processId: db.process_id, + port: parseInt(db.tcp_port), + os: db.os, + uptime: parseInt(db.uptime_in_seconds), + memory: { + total: parseInt(db.total_system_memory) || parseInt(db.maxmemory), + used: parseInt(db.used_memory), + fragmentationRatio: parseInt(db.mem_fragmentation_ratio), + peak: parseInt(db.used_memory_peak), + }, + clients: { + connected: parseInt(db.connected_clients), + blocked: parseInt(db.blocked_clients), + }, + }, + }; + } } diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 6940e1c188..c98ebcdcd9 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -44,7 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { QUEUE, baseQueueOptions } from './const.js'; +import { QUEUE, baseWorkerOptions } from './const.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 function httpRelatedBackoff(attemptsMade: number) { @@ -175,7 +175,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM), autorun: false, }); @@ -232,7 +232,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DB), + ...baseWorkerOptions(this.config, QUEUE.DB), autorun: false, }); @@ -264,7 +264,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.deliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DELIVER), + ...baseWorkerOptions(this.config, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -304,7 +304,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.inboxProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.INBOX), + ...baseWorkerOptions(this.config, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -344,7 +344,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.userWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -384,7 +384,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.systemWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), autorun: false, concurrency: 16, limiter: { @@ -434,7 +434,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP), autorun: false, concurrency: this.config.relationshipJobConcurrency ?? 16, limiter: { @@ -479,7 +479,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE), autorun: false, concurrency: 16, }); @@ -512,7 +512,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.endedPollNotificationProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), autorun: false, }); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 67f689b618..7e146a7e03 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ +import { MetricsTime } from 'bullmq'; import { Config } from '@/config.js'; import type * as Bull from 'bullmq'; @@ -27,3 +28,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`, }; } + +export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { + return { + ...baseQueueOptions(config, queueName), + metrics: { + maxDataPoints: MetricsTime.ONE_WEEK, + }, + }; +} diff --git a/packages/backend/src/server/api/endpoint-list.ts b/packages/backend/src/server/api/endpoint-list.ts index 34aaef3cc7..e5170aa2dc 100644 --- a/packages/backend/src/server/api/endpoint-list.ts +++ b/packages/backend/src/server/api/endpoint-list.ts @@ -67,8 +67,14 @@ export * as 'admin/promo/create' from './endpoints/admin/promo/create.js'; export * as 'admin/queue/clear' from './endpoints/admin/queue/clear.js'; export * as 'admin/queue/deliver-delayed' from './endpoints/admin/queue/deliver-delayed.js'; export * as 'admin/queue/inbox-delayed' from './endpoints/admin/queue/inbox-delayed.js'; -export * as 'admin/queue/promote' from './endpoints/admin/queue/promote.js'; +export * as 'admin/queue/retry-job' from './endpoints/admin/queue/retry-job.js'; +export * as 'admin/queue/remove-job' from './endpoints/admin/queue/remove-job.js'; +export * as 'admin/queue/show-job' from './endpoints/admin/queue/show-job.js'; +export * as 'admin/queue/promote-jobs' from './endpoints/admin/queue/promote-jobs.js'; +export * as 'admin/queue/jobs' from './endpoints/admin/queue/jobs.js'; export * as 'admin/queue/stats' from './endpoints/admin/queue/stats.js'; +export * as 'admin/queue/queues' from './endpoints/admin/queue/queues.js'; +export * as 'admin/queue/queue-stats' from './endpoints/admin/queue/queue-stats.js'; export * as 'admin/relays/add' from './endpoints/admin/relays/add.js'; export * as 'admin/relays/list' from './endpoints/admin/relays/list.js'; export * as 'admin/relays/remove' from './endpoints/admin/relays/remove.js'; diff --git a/packages/backend/src/server/api/endpoints/admin/queue/clear.ts b/packages/backend/src/server/api/endpoints/admin/queue/clear.ts index 3978f14f2d..81cb4b8119 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/clear.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/clear.ts @@ -19,10 +19,10 @@ export const meta = { export const paramDef = { type: 'object', properties: { - type: { type: 'string', enum: QUEUE_TYPES }, - state: { type: 'string', enum: ['*', 'wait', 'delayed'] }, + queue: { type: 'string', enum: QUEUE_TYPES }, + state: { type: 'string', enum: ['*', 'completed', 'wait', 'active', 'paused', 'prioritized', 'delayed', 'failed'] }, }, - required: ['type', 'state'], + required: ['queue', 'state'], } as const; @Injectable() @@ -32,7 +32,7 @@ export default class extends Endpoint { // eslint- private queueService: QueueService, ) { super(meta, paramDef, async (ps, me) => { - this.queueService.clearQueue(ps.type, ps.state); + this.queueService.queueClear(ps.queue, ps.state); this.moderationLogService.log(me, 'clearQueue'); }); diff --git a/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts b/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts new file mode 100644 index 0000000000..79731c9786 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'read:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + state: { type: 'array', items: { type: 'string', enum: ['active', 'wait', 'delayed', 'completed', 'failed'] } }, + search: { type: 'string' }, + }, + required: ['queue', 'state'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + return this.queueService.queueGetJobs(ps.queue, ps.state, ps.search); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts b/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts new file mode 100644 index 0000000000..d22385e261 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts @@ -0,0 +1,39 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'write:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + }, + required: ['queue'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private moderationLogService: ModerationLogService, + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + this.queueService.queuePromoteJobs(ps.queue); + + this.moderationLogService.log(me, 'promoteQueue'); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/promote.ts b/packages/backend/src/server/api/endpoints/admin/queue/promote.ts deleted file mode 100644 index 7502d4e1f7..0000000000 --- a/packages/backend/src/server/api/endpoints/admin/queue/promote.ts +++ /dev/null @@ -1,77 +0,0 @@ -/* - * SPDX-FileCopyrightText: syuilo and misskey-project - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { Injectable } from '@nestjs/common'; -import { Endpoint } from '@/server/api/endpoint-base.js'; -import { ModerationLogService } from '@/core/ModerationLogService.js'; -import { QueueService } from '@/core/QueueService.js'; - -export const meta = { - tags: ['admin'], - - requireCredential: true, - requireModerator: true, - kind: 'write:admin:queue', -} as const; - -export const paramDef = { - type: 'object', - properties: { - type: { type: 'string', enum: ['deliver', 'inbox'] }, - }, - required: ['type'], -} as const; - -@Injectable() -export default class extends Endpoint { // eslint-disable-line import/no-default-export - constructor( - private moderationLogService: ModerationLogService, - private queueService: QueueService, - ) { - super(meta, paramDef, async (ps, me) => { - let delayedQueues; - - switch (ps.type) { - case 'deliver': - delayedQueues = await this.queueService.deliverQueue.getDelayed(); - for (let queueIndex = 0; queueIndex < delayedQueues.length; queueIndex++) { - const queue = delayedQueues[queueIndex]; - try { - await queue.promote(); - } catch (e) { - if (e instanceof Error) { - if (e.message.indexOf('not in a delayed state') !== -1) { - throw e; - } - } else { - throw e; - } - } - } - break; - - case 'inbox': - delayedQueues = await this.queueService.inboxQueue.getDelayed(); - for (let queueIndex = 0; queueIndex < delayedQueues.length; queueIndex++) { - const queue = delayedQueues[queueIndex]; - try { - await queue.promote(); - } catch (e) { - if (e instanceof Error) { - if (e.message.indexOf('not in a delayed state') !== -1) { - throw e; - } - } else { - throw e; - } - } - } - break; - } - - this.moderationLogService.log(me, 'promoteQueue'); - }); - } -} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts new file mode 100644 index 0000000000..10ce48332a --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts @@ -0,0 +1,36 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'read:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + }, + required: ['queue'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + return this.queueService.queueGetQueue(ps.queue); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/queues.ts b/packages/backend/src/server/api/endpoints/admin/queue/queues.ts new file mode 100644 index 0000000000..3a38275f60 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/queues.ts @@ -0,0 +1,35 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'read:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + }, + required: [], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + return this.queueService.queueGetQueues(); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts new file mode 100644 index 0000000000..2c73f689d0 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'write:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + jobId: { type: 'string' }, + }, + required: ['queue', 'jobId'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private moderationLogService: ModerationLogService, + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + this.queueService.queueRemoveJob(ps.queue, ps.jobId); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts new file mode 100644 index 0000000000..b2603128f8 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'write:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + jobId: { type: 'string' }, + }, + required: ['queue', 'jobId'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private moderationLogService: ModerationLogService, + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + this.queueService.queueRetryJob(ps.queue, ps.jobId); + }); + } +} diff --git a/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts new file mode 100644 index 0000000000..63747b5540 --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { Endpoint } from '@/server/api/endpoint-base.js'; +import { ModerationLogService } from '@/core/ModerationLogService.js'; +import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js'; + +export const meta = { + tags: ['admin'], + + requireCredential: true, + requireModerator: true, + kind: 'read:admin:queue', +} as const; + +export const paramDef = { + type: 'object', + properties: { + queue: { type: 'string', enum: QUEUE_TYPES }, + jobId: { type: 'string' }, + }, + required: ['queue', 'jobId'], +} as const; + +@Injectable() +export default class extends Endpoint { // eslint-disable-line import/no-default-export + constructor( + private moderationLogService: ModerationLogService, + private queueService: QueueService, + ) { + super(meta, paramDef, async (ps, me) => { + return this.queueService.queueGetJob(ps.queue, ps.jobId); + }); + } +} diff --git a/packages/frontend/src/components/MkCode.vue b/packages/frontend/src/components/MkCode.vue index d2d9f320ee..f41cb0d00b 100644 --- a/packages/frontend/src/components/MkCode.vue +++ b/packages/frontend/src/components/MkCode.vue @@ -12,8 +12,8 @@ SPDX-License-Identifier: AGPL-3.0-only - -
{{ code }}
+ +
{{ code }}