diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 18efc9d562..89e3eafa0e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -55,6 +55,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js'; import { isReply } from '@/misc/is-reply.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -146,6 +147,7 @@ type Option = { @Injectable() export class NoteCreateService implements OnApplicationShutdown { #shutdownController = new AbortController(); + private updateNotesCountQueue: CollapsedQueue; constructor( @Inject(DI.config) @@ -215,7 +217,9 @@ export class NoteCreateService implements OnApplicationShutdown { private instanceChart: InstanceChart, private utilityService: UtilityService, private userBlockingService: UserBlockingService, - ) { } + ) { + this.updateNotesCountQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseNotesCount, this.performUpdateNotesCount); + } @bindThis public async create(user: { @@ -509,7 +513,7 @@ export class NoteCreateService implements OnApplicationShutdown { // Register host if (this.userEntityService.isRemoteUser(user)) { this.federatedInstanceService.fetch(user.host).then(async i => { - this.instancesRepository.increment({ id: i.id }, 'notesCount', 1); + this.updateNotesCountQueue.enqueue(i.id, 1); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); } @@ -1028,12 +1032,23 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - public dispose(): void { - this.#shutdownController.abort(); + private collapseNotesCount(oldValue: number, newValue: number) { + return oldValue + newValue; } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + } + + @bindThis + public async dispose(): Promise { + this.#shutdownController.abort(); + await this.updateNotesCountQueue.performAllNow(); + } + + @bindThis + public async onApplicationShutdown(signal?: string | undefined): Promise { + await this.dispose(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts new file mode 100644 index 0000000000..5bc20a78ae --- /dev/null +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -0,0 +1,44 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +type Job = { + value: V; + timer: NodeJS.Timeout; +}; + +// TODO: redis使えるようにする +export class CollapsedQueue { + private jobs: Map> = new Map(); + + constructor( + private timeout: number, + private collapse: (oldValue: V, newValue: V) => V, + private perform: (key: K, value: V) => Promise, + ) {} + + enqueue(key: K, value: V) { + if (this.jobs.has(key)) { + const old = this.jobs.get(key)!; + const merged = this.collapse(old.value, value); + this.jobs.set(key, { ...old, value: merged }); + } else { + const timer = setTimeout(() => { + const job = this.jobs.get(key)!; + this.jobs.delete(key); + this.perform(key, job.value); + }, this.timeout); + this.jobs.set(key, { value, timer }); + } + } + + async performAllNow() { + const entries = [...this.jobs.entries()]; + this.jobs.clear(); + for (const [_key, job] of entries) { + clearTimeout(job.timer); + } + await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 2df37bedf4..68999b5d17 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -25,14 +25,22 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; + +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + private updateInstanceQueue: CollapsedQueue; constructor( @Inject(DI.meta) @@ -51,6 +59,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -187,11 +196,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -227,4 +234,36 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + public async dispose(): Promise { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } }