From 90b96093411250df81bded348074ea16d32d32c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A5=BA=E5=AD=90w=20=28Yumechi=29?= <35571479+eternal-flame-AD@users.noreply.github.com> Date: Thu, 14 Aug 2025 07:54:28 +0000 Subject: [PATCH] enhance: performance for CleanRemoteNotesProcessorService (#16404) * enhance: performance for CleanRemoteNotesProcessorService Signed-off-by: eternal-flame-AD * suggestions Signed-off-by: eternal-flame-AD * docs Signed-off-by: eternal-flame-AD * change initial limit to 100 Signed-off-by: eternal-flame-AD * robustness for transient race conditions Signed-off-by: eternal-flame-AD * handle cursors in postgres Signed-off-by: eternal-flame-AD * robustness: transient errors and timeout handling Signed-off-by: eternal-flame-AD * use '0' as initial cursor Signed-off-by: eternal-flame-AD --------- Signed-off-by: eternal-flame-AD --- .../CleanRemoteNotesProcessorService.ts | 272 ++++++++++++------ .../CleanRemoteNotesProcessorService.ts | 3 + 2 files changed, 181 insertions(+), 94 deletions(-) diff --git a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts index da3bb804c2..77a9dc5557 100644 --- a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts @@ -5,6 +5,7 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Injectable } from '@nestjs/common'; +import { DataSource, IsNull, LessThan, QueryFailedError, Not } from 'typeorm'; import { DI } from '@/di-symbols.js'; import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; @@ -24,18 +25,31 @@ export class CleanRemoteNotesProcessorService { @Inject(DI.notesRepository) private notesRepository: NotesRepository, + @Inject(DI.db) + private db: DataSource, + private idService: IdService, private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('clean-remote-notes'); } + @bindThis + private computeProgress(minId: string, maxId: string, cursorLeft: string) { + const minTs = this.idService.parse(minId).date.getTime(); + const maxTs = this.idService.parse(maxId).date.getTime(); + const cursorTs = this.idService.parse(cursorLeft).date.getTime(); + + return ((cursorTs - minTs) / (maxTs - minTs)) * 100; + } + @bindThis public async process(job: Bull.Job>): Promise<{ deletedCount: number; oldest: number | null; newest: number | null; - skipped?: boolean; + skipped: boolean; + transientErrors: number; }> { if (!this.meta.enableRemoteNotesCleaning) { this.logger.info('Remote notes cleaning is disabled, skipping...'); @@ -44,6 +58,7 @@ export class CleanRemoteNotesProcessorService { oldest: null, newest: null, skipped: true, + transientErrors: 0, }; } @@ -52,12 +67,10 @@ export class CleanRemoteNotesProcessorService { const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds const startAt = Date.now(); - const MAX_NOTE_COUNT_PER_QUERY = 50; - - //#retion queries - // We use string literals instead of query builder for several reasons: - // - for removeCondition, we need to use it in having clause, which is not supported by Brackets. - // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query + //#region queries + // The date limit for the newest note to be considered for deletion. + // All notes newer than this limit will always be retained. + const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes)); // The condition for removing the notes. // The note must be: @@ -66,56 +79,93 @@ export class CleanRemoteNotesProcessorService { // - not have clipped // - not have pinned on the user profile // - not has been favorite by any user - const removeCondition = 'note.id < :newestLimit' - + ' AND note."clippedCount" = 0' - + ' AND note."userHost" IS NOT NULL' - // using both userId and noteId instead of just noteId to use index on user_note_pining table. - // This is safe because notes are only pinned by the user who created them. - + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")' - // We cannot use userId trick because users can favorite notes from other users. - + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")' - ; + const removalCriteria = [ + 'note."id" < :newestLimit', + 'note."clippedCount" = 0', + 'note."userHost" IS NOT NULL', + 'NOT EXISTS (SELECT 1 FROM user_note_pining WHERE "noteId" = note."id")', + 'NOT EXISTS (SELECT 1 FROM note_favorite WHERE "noteId" = note."id")', + ].join(' AND '); - // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes - const initiatorQuery = this.notesRepository.createQueryBuilder('note') + const minId = (await this.notesRepository.createQueryBuilder('note') + .select('MIN(note.id)', 'minId') + .where({ + id: LessThan(newestLimit), + userHost: Not(IsNull()), + replyId: IsNull(), + renoteId: IsNull(), + }) + .getRawOne<{ minId?: MiNote['id'] }>())?.minId; + + if (!minId) { + this.logger.info('No notes can possibly be deleted, skipping...'); + return { + deletedCount: 0, + oldest: null, + newest: null, + skipped: false, + transientErrors: 0, + }; + } + + // start with a conservative limit and adjust it based on the query duration + const minimumLimit = 10; + let currentLimit = 100; + let cursorLeft = '0'; + + const candidateNotesCteName = 'candidate_notes'; + + // tree walk down all root notes, short-circuit when the first unremovable note is found + const candidateNotesQueryBase = this.notesRepository.createQueryBuilder('note') + .select('note."id"', 'id') + .addSelect('note."replyId"', 'replyId') + .addSelect('note."renoteId"', 'renoteId') + .addSelect('note."id"', 'rootId') + .addSelect('TRUE', 'isRemovable') + .addSelect('TRUE', 'isBase') + .where('note."id" > :cursorLeft') + .andWhere(removalCriteria) + .andWhere({ replyId: IsNull(), renoteId: IsNull() }); + + const candidateNotesQueryInductive = this.notesRepository.createQueryBuilder('note') .select('note.id', 'id') - .where(removeCondition) - .andWhere('note.id > :cursor') - .orderBy('note.id', 'ASC') - .limit(MAX_NOTE_COUNT_PER_QUERY); + .addSelect('note."replyId"', 'replyId') + .addSelect('note."renoteId"', 'renoteId') + .addSelect('parent."rootId"', 'rootId') + .addSelect(removalCriteria, 'isRemovable') + .addSelect('FALSE', 'isBase') + .innerJoin(candidateNotesCteName, 'parent', 'parent."id" = note."replyId" OR parent."id" = note."renoteId"') + .where('parent."isRemovable" = TRUE'); - // The union query queries the related notes and replies related to the initiator query - const unionQuery = ` - SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId" - FROM "note" "note" - INNER JOIN "related_notes" "rn" - ON "note"."replyId" = rn.id - OR "note"."renoteId" = rn.id - OR "note"."id" = rn."replyId" - OR "note"."id" = rn."renoteId" - `; - - const selectRelatedNotesFromInitiatorIdsQuery = ` - SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId" - FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds) - `; - - const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`; - - const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note') - .select('rn."initiatorId"') - .innerJoin('related_notes', 'rn', 'note.id = rn.id') - .groupBy('rn."initiatorId"') - .having(`bool_and(${removeCondition})`); - - const notesQuery = this.notesRepository.createQueryBuilder('note') - .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true }) - .select('note.id', 'id') - .addSelect('rn."initiatorId"') - .innerJoin('related_notes', 'rn', 'note.id = rn.id') - .where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`) - .distinctOn(['note.id']); - //#endregion + // A note tree can be deleted if there are no unremovable rows with the same rootId. + // + // `candidate_notes` will have the following structure after recursive query (some columns omitted): + // After performing a LEFT JOIN with `candidate_notes` as `unremovable`, + // the note tree containing unremovable notes will be anti-joined. + // For removable rows, the `unremovable` columns will have `NULL` values. + // | id | rootId | isRemovable | + // |-----|--------|-------------| + // | aaa | aaa | TRUE | + // | bbb | aaa | FALSE | + // | ccc | aaa | FALSE | + // | ddd | ddd | TRUE | + // | eee | ddd | TRUE | + // | fff | fff | TRUE | + // | ggg | ggg | FALSE | + // + const candidateNotesQuery = this.db.createQueryBuilder() + .select(`"${candidateNotesCteName}"."id"`, 'id') + .addSelect('unremovable."id" IS NULL', 'isRemovable') + .addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase') + .addCommonTableExpression( + `((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`, + candidateNotesCteName, + { recursive: true }, + ) + .from(candidateNotesCteName, candidateNotesCteName) + .leftJoin(candidateNotesCteName, 'unremovable', `unremovable."rootId" = "${candidateNotesCteName}"."rootId" AND unremovable."isRemovable" = FALSE`) + .groupBy(`"${candidateNotesCteName}"."id"`) + .addGroupBy('unremovable."id" IS NULL'); const stats = { deletedCount: 0, @@ -123,74 +173,107 @@ export class CleanRemoteNotesProcessorService { newest: null as number | null, }; - // The date limit for the newest note to be considered for deletion. - // All notes newer than this limit will always be retained. - const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes)); - - let cursor = '0'; // oldest note ID to start from - - while (true) { + let lowThroughputWarned = false; + let transientErrors = 0; + for (;;) { //#region check time const batchBeginAt = Date.now(); const elapsed = batchBeginAt - startAt; + const progress = this.computeProgress(minId, newestLimit, cursorLeft > minId ? cursorLeft : minId); + if (elapsed >= maxDuration) { - this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`); - job.log('Reached maximum duration, stopping cleaning.'); + job.log(`Reached maximum duration of ${maxDuration}ms, stopping... (last cursor: ${cursorLeft}, final progress ${progress}%)`); job.updateProgress(100); break; } - job.updateProgress((elapsed / maxDuration) * 100); + const wallClockUsage = elapsed / maxDuration; + if (wallClockUsage > 0.5 && progress < 50 && !lowThroughputWarned) { + const msg = `Not projected to finish in time! (wall clock usage ${wallClockUsage * 100}% at ${progress}%, current limit ${currentLimit})`; + this.logger.warn(msg); + job.log(msg); + lowThroughputWarned = true; + } + job.updateProgress(progress); //#endregion - // First, we fetch the initiator notes that are older than the newestLimit. - const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany(); + const queryBegin = performance.now(); + let noteIds = null; - // update the cursor to the newest initiatorId found in the fetched notes. - const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor); + try { + noteIds = await candidateNotesQuery.setParameters( + { newestLimit, cursorLeft }, + ).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>(); + } catch (e) { + if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') { + // Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again + // continuous failures will eventually converge to currentLimit == minimumLimit and then throw + currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25)); + continue; + } + throw e; + } - if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) { - // If no notes were found or the cursor did not change, we can stop. - job.log('No more notes to clean. (no initiator notes found or cursor did not change.)'); + if (noteIds.length === 0) { + job.log('No more notes to clean.'); break; } - const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({ - initiatorIds: initiatorNotes.map(note => note.id), - newestLimit, - }).getRawMany(); + const queryDuration = performance.now() - queryBegin; + // try to adjust such that each query takes about 1~5 seconds and reasonable NodeJS heap so the task stays responsive + // this should not oscillate.. + if (queryDuration > 5000 || noteIds.length > 5000) { + currentLimit = Math.floor(currentLimit * 0.5); + } else if (queryDuration < 1000 && noteIds.length < 1000) { + currentLimit = Math.floor(currentLimit * 1.5); + } + // clamp to a sane range + currentLimit = Math.min(Math.max(currentLimit, minimumLimit), 5000); - cursor = newCursor; + const deletableNoteIds = noteIds.filter(result => result.isRemovable).map(result => result.id); + if (deletableNoteIds.length > 0) { + try { + await this.notesRepository.delete(deletableNoteIds); - if (notes.length > 0) { - await this.notesRepository.delete(notes.map(note => note.id)); - - for (const { id } of notes) { - const t = this.idService.parse(id).date.getTime(); - if (stats.oldest === null || t < stats.oldest) { - stats.oldest = t; + for (const id of deletableNoteIds) { + const t = this.idService.parse(id).date.getTime(); + if (stats.oldest === null || t < stats.oldest) { + stats.oldest = t; + } + if (stats.newest === null || t > stats.newest) { + stats.newest = t; + } } - if (stats.newest === null || t > stats.newest) { - stats.newest = t; + + stats.deletedCount += deletableNoteIds.length; + } catch (e) { + // check for integrity violation errors (class 23) that might have occurred between the check and the delete + // we can safely continue to the next batch + if (e instanceof QueryFailedError && e.driverError?.code?.startsWith('23')) { + transientErrors++; + job.log(`Error deleting notes: ${e} (transient race condition?)`); + } else { + throw e; } } - - stats.deletedCount += notes.length; } - job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`); + cursorLeft = noteIds.filter(result => result.isBase).reduce((max, { id }) => id > max ? id : max, cursorLeft); - if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) { - // If we fetched less than the maximum, it means there are no more notes to process. - job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`); - break; + job.log(`Deleted ${noteIds.length} notes; ${Date.now() - batchBeginAt}ms`); + + if (process.env.NODE_ENV !== 'test') { + await setTimeout(Math.min(1000 * 5, queryDuration)); // Wait a moment to avoid overwhelming the db } + }; - await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db + if (transientErrors > 0) { + const msg = `${transientErrors} transient errors occurred while cleaning remote notes. You may need a second pass to complete the cleaning.`; + this.logger.warn(msg); + job.log(msg); } - this.logger.succ('cleaning of remote notes completed.'); return { @@ -198,6 +281,7 @@ export class CleanRemoteNotesProcessorService { oldest: stats.oldest, newest: stats.newest, skipped: false, + transientErrors, }; } } diff --git a/packages/backend/test/unit/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/test/unit/queue/processors/CleanRemoteNotesProcessorService.ts index 15f8eda865..597d6b90cd 100644 --- a/packages/backend/test/unit/queue/processors/CleanRemoteNotesProcessorService.ts +++ b/packages/backend/test/unit/queue/processors/CleanRemoteNotesProcessorService.ts @@ -158,6 +158,7 @@ describe('CleanRemoteNotesProcessorService', () => { oldest: null, newest: null, skipped: true, + transientErrors: 0, }); }); @@ -172,6 +173,7 @@ describe('CleanRemoteNotesProcessorService', () => { oldest: null, newest: null, skipped: false, + transientErrors: 0, }); }, 3000); @@ -199,6 +201,7 @@ describe('CleanRemoteNotesProcessorService', () => { oldest: expect.any(Number), newest: expect.any(Number), skipped: false, + transientErrors: 0, }); // Check side-by-side from all notes