From ae0b237e3b0cc897cf993aca6b19cd90caeee287 Mon Sep 17 00:00:00 2001 From: Hidekazu Kobayashi Date: Mon, 16 Sep 2024 11:35:22 +0000 Subject: [PATCH] Perform deferred jobs on shutdown --- .../backend/src/core/NoteCreateService.ts | 7 ++-- packages/backend/src/misc/collapsed-queue.ts | 32 +++++++++++++------ .../queue/processors/InboxProcessorService.ts | 18 ++++++++--- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index bd9d828357..31070e5f09 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1050,12 +1050,13 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - public dispose(): void { + public async dispose(): Promise { this.#shutdownController.abort(); + await this.updateNotesCountQueue.performAllNow(); } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async onApplicationShutdown(signal?: string | undefined): Promise { + await this.dispose(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 1d2750b19c..dee6df32bd 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -3,27 +3,41 @@ * SPDX-License-Identifier: AGPL-3.0-only */ +type Job = { + value: V; + timer: NodeJS.Timeout; +}; + export class CollapsedQueue { - private jobs: Map = new Map(); + private jobs: Map> = new Map(); constructor( private timeout: number, private collapse: (oldValue: V, newValue: V) => V, - private doJob: (key: K, value: V) => void, - ) { } + private perform: (key: K, value: V) => Promise, + ) {} enqueue(key: K, value: V) { if (this.jobs.has(key)) { const old = this.jobs.get(key)!; - const merged = this.collapse(old, value); - this.jobs.set(key, merged); + const merged = this.collapse(old.value, value); + this.jobs.set(key, { ...old, value: merged }); } else { - this.jobs.set(key, value); - setTimeout(() => { - const value = this.jobs.get(key)!; + const timer = setTimeout(() => { + const job = this.jobs.get(key)!; this.jobs.delete(key); - this.doJob(key, value); + this.perform(key, job.value); }, this.timeout); + this.jobs.set(key, { value, timer }); } } + + async performAllNow() { + const entries = [...this.jobs.entries()]; + this.jobs.clear(); + for (const [_key, job] of entries) { + clearTimeout(job.timer); + } + await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); + } } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 69f868fcc8..fa929a4d77 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Injectable } from '@nestjs/common'; +import { Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -26,10 +26,10 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; type UpdateInstanceJob = { latestRequestReceivedAt: Date, @@ -37,7 +37,7 @@ type UpdateInstanceJob = { }; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; private updateInstanceQueue: CollapsedQueue; @@ -254,4 +254,14 @@ export class InboxProcessorService { suspensionState: job.shouldUnsuspend ? 'none' : undefined, }); } + + @bindThis + public async dispose(): Promise { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } }