diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index b10b8e5899..6e3eb29545 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -16,11 +16,13 @@ import { RelationshipJobData, UserWebhookDeliverJobData, SystemWebhookDeliverJobData, + DeleteUserMutingsJobData, } from '../queue/types.js'; import type { Provider } from '@nestjs/common'; export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; +export type DeleteUserMutingQueue = Bull.Queue; export type DeliverQueue = Bull.Queue; export type InboxQueue = Bull.Queue; export type DbQueue = Bull.Queue; @@ -41,6 +43,12 @@ const $endedPollNotification: Provider = { inject: [DI.config], }; +const $deleteUserMuting: Provider = { + provide: 'queue:deleteUserMuting', + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELETE_USER_MUTING, baseQueueOptions(config, QUEUE.DELETE_USER_MUTING)), + inject: [DI.config], +}; + const $deliver: Provider = { provide: 'queue:deliver', useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)), @@ -89,6 +97,7 @@ const $systemWebhookDeliver: Provider = { providers: [ $system, $endedPollNotification, + $deleteUserMuting, $deliver, $inbox, $db, @@ -100,6 +109,7 @@ const $systemWebhookDeliver: Provider = { exports: [ $system, $endedPollNotification, + $deleteUserMuting, $deliver, $inbox, $db, @@ -113,6 +123,7 @@ export class QueueModule implements OnApplicationShutdown { constructor( @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deleteUserMuting') public deleteUserMutingQueue: DeleteUserMutingQueue, @Inject('queue:deliver') public deliverQueue: DeliverQueue, @Inject('queue:inbox') public inboxQueue: InboxQueue, @Inject('queue:db') public dbQueue: DbQueue, @@ -129,6 +140,7 @@ export class QueueModule implements OnApplicationShutdown { await Promise.all([ this.systemQueue.close(), this.endedPollNotificationQueue.close(), + this.deleteUserMutingQueue.close(), this.deliverQueue.close(), this.inboxQueue.close(), this.dbQueue.close(), diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 4be568b334..cea0a400c5 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -31,6 +31,7 @@ import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, + DeleteUserMutingQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, @@ -44,6 +45,7 @@ import type * as Bull from 'bullmq'; export const QUEUE_TYPES = [ 'system', 'endedPollNotification', + 'deleteUserMuting', 'deliver', 'inbox', 'db', @@ -70,18 +72,16 @@ const REPEATABLE_SYSTEM_JOB_DEF = [{ pattern: '0 0 * * *', }, { name: 'checkExpiredMutings', - pattern: '*/5 * * * *', + pattern: '0 */3 * * *', // 3時間ごと }, { name: 'bakeBufferedReactions', pattern: '0 0 * * *', }, { name: 'checkModeratorsActivity', - // 毎時30分に起動 - pattern: '30 * * * *', + pattern: '30 * * * *', // 毎時30分に起動 }, { name: 'cleanRemoteNotes', - // 毎日午前4時に起動(最も人の少ない時間帯) - pattern: '0 4 * * *', + pattern: '0 4 * * *', // 毎日午前4時に起動(最も人の少ない時間帯) }]; @Injectable() @@ -92,6 +92,7 @@ export class QueueService { @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deleteUserMutingQueue') public deleteUserMutingQueue: DeleteUserMutingQueue, @Inject('queue:deliver') public deliverQueue: DeliverQueue, @Inject('queue:inbox') public inboxQueue: InboxQueue, @Inject('queue:db') public dbQueue: DbQueue, @@ -716,6 +717,7 @@ export class QueueService { switch (type) { case 'system': return this.systemQueue; case 'endedPollNotification': return this.endedPollNotificationQueue; + case 'deleteUserMuting': return this.deleteUserMutingQueue; case 'deliver': return this.deliverQueue; case 'inbox': return this.inboxQueue; case 'db': return this.dbQueue; diff --git a/packages/backend/src/core/UserMutingService.ts b/packages/backend/src/core/UserMutingService.ts index 06643be5fb..d19a4221e4 100644 --- a/packages/backend/src/core/UserMutingService.ts +++ b/packages/backend/src/core/UserMutingService.ts @@ -11,6 +11,7 @@ import type { MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { CacheService } from '@/core/CacheService.js'; +import { QueueService } from '@/core/QueueService.js'; @Injectable() export class UserMutingService { @@ -20,12 +21,13 @@ export class UserMutingService { private idService: IdService, private cacheService: CacheService, + private queueService: QueueService, ) { } @bindThis public async mute(user: MiUser, target: MiUser, expiresAt: Date | null = null): Promise { - await this.mutingsRepository.insert({ + const inserted = await this.mutingsRepository.insertOne({ id: this.idService.gen(), expiresAt: expiresAt ?? null, muterId: user.id, @@ -33,6 +35,23 @@ export class UserMutingService { }); this.cacheService.userMutingsCache.refresh(user.id); + + if (expiresAt != null) { + const delay = expiresAt.getTime() - Date.now(); + this.queueService.deleteUserMutingQueue.add(inserted.id, { + mutingId: inserted.id, + }, { + delay, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, + }); + } } @bindThis @@ -48,4 +67,25 @@ export class UserMutingService { this.cacheService.userMutingsCache.refresh(muterId); } } + + @bindThis + public async deleteExpired(): Promise { + const expired = await this.mutingsRepository.createQueryBuilder('muting') + .where('muting.expiresAt IS NOT NULL') + .andWhere('muting.expiresAt < :now', { now: new Date() }) + .innerJoinAndSelect('muting.mutee', 'mutee') + .getMany(); + + if (expired.length > 0) { + await this.unmute(expired); + } + } + + @bindThis + public async unmuteById(mutingId: MiMuting['id']): Promise { + const muting = await this.mutingsRepository.findOneBy({ id: mutingId }); + if (muting == null) return; + + await this.unmute([muting]); + } } diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 7b64182754..1b19aab003 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -14,6 +14,7 @@ import { CheckModeratorsActivityProcessorService } from '@/queue/processors/Chec import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; +import { DeleteUserMutingsProcessorService } from './processors/DeleteUserMutingsProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js'; @@ -85,6 +86,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; + private deleteUserMutingQueueWorker: Bull.Worker; constructor( @Inject(DI.config) @@ -94,6 +96,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private userWebhookDeliverProcessorService: UserWebhookDeliverProcessorService, private systemWebhookDeliverProcessorService: SystemWebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, + private deleteUserMutingsProcessorService: DeleteUserMutingsProcessorService, private deliverProcessorService: DeliverProcessorService, private inboxProcessorService: InboxProcessorService, private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService, @@ -520,6 +523,21 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } //#endregion + + //#region delete user muting + { + this.deleteUserMutingQueueWorker = new Bull.Worker(QUEUE.DELETE_USER_MUTING, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: DeleteUserMutings' }, () => this.deleteUserMutingsProcessorService.process(job)); + } else { + return this.deleteUserMutingsProcessorService.process(job); + } + }, { + ...baseWorkerOptions(this.config, QUEUE.DELETE_USER_MUTING), + autorun: false, + }); + } + //#endregion } @bindThis @@ -534,6 +552,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), + this.deleteUserMutingQueueWorker.run(), ]); } @@ -549,6 +568,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), + this.deleteUserMutingQueueWorker.close(), ]); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 7e146a7e03..0b3936ea2b 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -12,6 +12,7 @@ export const QUEUE = { INBOX: 'inbox', SYSTEM: 'system', ENDED_POLL_NOTIFICATION: 'endedPollNotification', + DELETE_USER_MUTING: 'deleteUserMuting', DB: 'db', RELATIONSHIP: 'relationship', OBJECT_STORAGE: 'objectStorage', diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts index 448fc9c763..4a7b3c847c 100644 --- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -3,24 +3,17 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Inject, Injectable } from '@nestjs/common'; -import { In } from 'typeorm'; -import { DI } from '@/di-symbols.js'; -import type { MutingsRepository } from '@/models/_.js'; +import { Injectable } from '@nestjs/common'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { UserMutingService } from '@/core/UserMutingService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type * as Bull from 'bullmq'; @Injectable() export class CheckExpiredMutingsProcessorService { private logger: Logger; constructor( - @Inject(DI.mutingsRepository) - private mutingsRepository: MutingsRepository, - private userMutingService: UserMutingService, private queueLoggerService: QueueLoggerService, ) { @@ -31,15 +24,7 @@ export class CheckExpiredMutingsProcessorService { public async process(): Promise { this.logger.info('Checking expired mutings...'); - const expired = await this.mutingsRepository.createQueryBuilder('muting') - .where('muting.expiresAt IS NOT NULL') - .andWhere('muting.expiresAt < :now', { now: new Date() }) - .innerJoinAndSelect('muting.mutee', 'mutee') - .getMany(); - - if (expired.length > 0) { - await this.userMutingService.unmute(expired); - } + await this.userMutingService.deleteExpired(); this.logger.succ('All expired mutings checked.'); } diff --git a/packages/backend/src/queue/processors/DeleteUserMutingsProcessorService.ts b/packages/backend/src/queue/processors/DeleteUserMutingsProcessorService.ts new file mode 100644 index 0000000000..c4e2ce60e8 --- /dev/null +++ b/packages/backend/src/queue/processors/DeleteUserMutingsProcessorService.ts @@ -0,0 +1,29 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { UserMutingService } from '@/core/UserMutingService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DeleteUserMutingsJobData } from '../types.js'; + +@Injectable() +export class DeleteUserMutingsProcessorService { + private logger: Logger; + + constructor( + private userMutingService: UserMutingService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('delete-user-mutings'); + } + + @bindThis + public async process(job: Bull.Job): Promise { + await this.userMutingService.unmuteById(job.data.mutingId); + } +} diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 757daea88b..45e15723f5 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -109,6 +109,10 @@ export type EndedPollNotificationJobData = { noteId: MiNote['id']; }; +export type DeleteUserMutingsJobData = { + mutingId: string; +}; + export type SystemWebhookDeliverJobData = { type: T; content: SystemWebhookPayload; diff --git a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts index d7f9e4eaa3..c2ce2c4f84 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts @@ -5,7 +5,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { Endpoint } from '@/server/api/endpoint-base.js'; -import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, UserWebhookDeliverQueue, SystemWebhookDeliverQueue } from '@/core/QueueModule.js'; +import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, UserWebhookDeliverQueue, SystemWebhookDeliverQueue, DeleteUserMutingQueue } from '@/core/QueueModule.js'; export const meta = { tags: ['admin'], @@ -49,6 +49,7 @@ export default class extends Endpoint { // eslint- constructor( @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deleteUserMuting') public deleteUserMutingQueue: DeleteUserMutingQueue, @Inject('queue:deliver') public deliverQueue: DeliverQueue, @Inject('queue:inbox') public inboxQueue: InboxQueue, @Inject('queue:db') public dbQueue: DbQueue, diff --git a/packages/backend/src/server/web/ClientServerService.ts b/packages/backend/src/server/web/ClientServerService.ts index 768cfde701..b515a0c0c8 100644 --- a/packages/backend/src/server/web/ClientServerService.ts +++ b/packages/backend/src/server/web/ClientServerService.ts @@ -20,17 +20,6 @@ import type { Config } from '@/config.js'; import { getNoteSummary } from '@/misc/get-note-summary.js'; import { DI } from '@/di-symbols.js'; import * as Acct from '@/misc/acct.js'; -import type { - DbQueue, - DeliverQueue, - EndedPollNotificationQueue, - InboxQueue, - ObjectStorageQueue, - RelationshipQueue, - SystemQueue, - UserWebhookDeliverQueue, - SystemWebhookDeliverQueue, -} from '@/core/QueueModule.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; import { PageEntityService } from '@/core/entities/PageEntityService.js'; @@ -129,16 +118,6 @@ export class ClientServerService { private feedService: FeedService, private roleService: RoleService, private clientLoggerService: ClientLoggerService, - - @Inject('queue:system') public systemQueue: SystemQueue, - @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, - @Inject('queue:deliver') public deliverQueue: DeliverQueue, - @Inject('queue:inbox') public inboxQueue: InboxQueue, - @Inject('queue:db') public dbQueue: DbQueue, - @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, - @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, - @Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue, - @Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue, ) { //this.createServer = this.createServer.bind(this); }