enhance: performance for CleanRemoteNotesProcessorService (#16404)

* enhance: performance for CleanRemoteNotesProcessorService

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* suggestions

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* docs

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* change initial limit to 100

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* robustness for transient race conditions

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* handle cursors in postgres

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* robustness: transient errors and timeout handling

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

* use '0' as initial cursor

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>

---------

Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
饺子w (Yumechi) 2025-08-14 07:54:28 +00:00 committed by GitHub
parent c25a922928
commit 90b9609341
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 181 additions and 94 deletions

View File

@ -5,6 +5,7 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Injectable } from '@nestjs/common';
import { DataSource, IsNull, LessThan, QueryFailedError, Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@ -24,18 +25,31 @@ export class CleanRemoteNotesProcessorService {
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,
@Inject(DI.db)
private db: DataSource,
private idService: IdService,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('clean-remote-notes');
}
@bindThis
private computeProgress(minId: string, maxId: string, cursorLeft: string) {
const minTs = this.idService.parse(minId).date.getTime();
const maxTs = this.idService.parse(maxId).date.getTime();
const cursorTs = this.idService.parse(cursorLeft).date.getTime();
return ((cursorTs - minTs) / (maxTs - minTs)) * 100;
}
@bindThis
public async process(job: Bull.Job<Record<string, unknown>>): Promise<{
deletedCount: number;
oldest: number | null;
newest: number | null;
skipped?: boolean;
skipped: boolean;
transientErrors: number;
}> {
if (!this.meta.enableRemoteNotesCleaning) {
this.logger.info('Remote notes cleaning is disabled, skipping...');
@ -44,6 +58,7 @@ export class CleanRemoteNotesProcessorService {
oldest: null,
newest: null,
skipped: true,
transientErrors: 0,
};
}
@ -52,12 +67,10 @@ export class CleanRemoteNotesProcessorService {
const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds
const startAt = Date.now();
const MAX_NOTE_COUNT_PER_QUERY = 50;
//#retion queries
// We use string literals instead of query builder for several reasons:
// - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
// - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
//#region queries
// The date limit for the newest note to be considered for deletion.
// All notes newer than this limit will always be retained.
const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
// The condition for removing the notes.
// The note must be:
@ -66,56 +79,93 @@ export class CleanRemoteNotesProcessorService {
// - not have clipped
// - not have pinned on the user profile
// - not has been favorite by any user
const removeCondition = 'note.id < :newestLimit'
+ ' AND note."clippedCount" = 0'
+ ' AND note."userHost" IS NOT NULL'
// using both userId and noteId instead of just noteId to use index on user_note_pining table.
// This is safe because notes are only pinned by the user who created them.
+ ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
// We cannot use userId trick because users can favorite notes from other users.
+ ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
;
const removalCriteria = [
'note."id" < :newestLimit',
'note."clippedCount" = 0',
'note."userHost" IS NOT NULL',
'NOT EXISTS (SELECT 1 FROM user_note_pining WHERE "noteId" = note."id")',
'NOT EXISTS (SELECT 1 FROM note_favorite WHERE "noteId" = note."id")',
].join(' AND ');
// The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
const initiatorQuery = this.notesRepository.createQueryBuilder('note')
const minId = (await this.notesRepository.createQueryBuilder('note')
.select('MIN(note.id)', 'minId')
.where({
id: LessThan(newestLimit),
userHost: Not(IsNull()),
replyId: IsNull(),
renoteId: IsNull(),
})
.getRawOne<{ minId?: MiNote['id'] }>())?.minId;
if (!minId) {
this.logger.info('No notes can possibly be deleted, skipping...');
return {
deletedCount: 0,
oldest: null,
newest: null,
skipped: false,
transientErrors: 0,
};
}
// start with a conservative limit and adjust it based on the query duration
const minimumLimit = 10;
let currentLimit = 100;
let cursorLeft = '0';
const candidateNotesCteName = 'candidate_notes';
// tree walk down all root notes, short-circuit when the first unremovable note is found
const candidateNotesQueryBase = this.notesRepository.createQueryBuilder('note')
.select('note."id"', 'id')
.addSelect('note."replyId"', 'replyId')
.addSelect('note."renoteId"', 'renoteId')
.addSelect('note."id"', 'rootId')
.addSelect('TRUE', 'isRemovable')
.addSelect('TRUE', 'isBase')
.where('note."id" > :cursorLeft')
.andWhere(removalCriteria)
.andWhere({ replyId: IsNull(), renoteId: IsNull() });
const candidateNotesQueryInductive = this.notesRepository.createQueryBuilder('note')
.select('note.id', 'id')
.where(removeCondition)
.andWhere('note.id > :cursor')
.orderBy('note.id', 'ASC')
.limit(MAX_NOTE_COUNT_PER_QUERY);
.addSelect('note."replyId"', 'replyId')
.addSelect('note."renoteId"', 'renoteId')
.addSelect('parent."rootId"', 'rootId')
.addSelect(removalCriteria, 'isRemovable')
.addSelect('FALSE', 'isBase')
.innerJoin(candidateNotesCteName, 'parent', 'parent."id" = note."replyId" OR parent."id" = note."renoteId"')
.where('parent."isRemovable" = TRUE');
// The union query queries the related notes and replies related to the initiator query
const unionQuery = `
SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
FROM "note" "note"
INNER JOIN "related_notes" "rn"
ON "note"."replyId" = rn.id
OR "note"."renoteId" = rn.id
OR "note"."id" = rn."replyId"
OR "note"."id" = rn."renoteId"
`;
const selectRelatedNotesFromInitiatorIdsQuery = `
SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds)
`;
const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`;
const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
.select('rn."initiatorId"')
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
.groupBy('rn."initiatorId"')
.having(`bool_and(${removeCondition})`);
const notesQuery = this.notesRepository.createQueryBuilder('note')
.addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
.select('note.id', 'id')
.addSelect('rn."initiatorId"')
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
.where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`)
.distinctOn(['note.id']);
//#endregion
// A note tree can be deleted if there are no unremovable rows with the same rootId.
//
// `candidate_notes` will have the following structure after recursive query (some columns omitted):
// After performing a LEFT JOIN with `candidate_notes` as `unremovable`,
// the note tree containing unremovable notes will be anti-joined.
// For removable rows, the `unremovable` columns will have `NULL` values.
// | id | rootId | isRemovable |
// |-----|--------|-------------|
// | aaa | aaa | TRUE |
// | bbb | aaa | FALSE |
// | ccc | aaa | FALSE |
// | ddd | ddd | TRUE |
// | eee | ddd | TRUE |
// | fff | fff | TRUE |
// | ggg | ggg | FALSE |
//
const candidateNotesQuery = this.db.createQueryBuilder()
.select(`"${candidateNotesCteName}"."id"`, 'id')
.addSelect('unremovable."id" IS NULL', 'isRemovable')
.addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase')
.addCommonTableExpression(
`((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
candidateNotesCteName,
{ recursive: true },
)
.from(candidateNotesCteName, candidateNotesCteName)
.leftJoin(candidateNotesCteName, 'unremovable', `unremovable."rootId" = "${candidateNotesCteName}"."rootId" AND unremovable."isRemovable" = FALSE`)
.groupBy(`"${candidateNotesCteName}"."id"`)
.addGroupBy('unremovable."id" IS NULL');
const stats = {
deletedCount: 0,
@ -123,74 +173,107 @@ export class CleanRemoteNotesProcessorService {
newest: null as number | null,
};
// The date limit for the newest note to be considered for deletion.
// All notes newer than this limit will always be retained.
const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
let cursor = '0'; // oldest note ID to start from
while (true) {
let lowThroughputWarned = false;
let transientErrors = 0;
for (;;) {
//#region check time
const batchBeginAt = Date.now();
const elapsed = batchBeginAt - startAt;
const progress = this.computeProgress(minId, newestLimit, cursorLeft > minId ? cursorLeft : minId);
if (elapsed >= maxDuration) {
this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
job.log('Reached maximum duration, stopping cleaning.');
job.log(`Reached maximum duration of ${maxDuration}ms, stopping... (last cursor: ${cursorLeft}, final progress ${progress}%)`);
job.updateProgress(100);
break;
}
job.updateProgress((elapsed / maxDuration) * 100);
const wallClockUsage = elapsed / maxDuration;
if (wallClockUsage > 0.5 && progress < 50 && !lowThroughputWarned) {
const msg = `Not projected to finish in time! (wall clock usage ${wallClockUsage * 100}% at ${progress}%, current limit ${currentLimit})`;
this.logger.warn(msg);
job.log(msg);
lowThroughputWarned = true;
}
job.updateProgress(progress);
//#endregion
// First, we fetch the initiator notes that are older than the newestLimit.
const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany();
const queryBegin = performance.now();
let noteIds = null;
// update the cursor to the newest initiatorId found in the fetched notes.
const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor);
try {
noteIds = await candidateNotesQuery.setParameters(
{ newestLimit, cursorLeft },
).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>();
} catch (e) {
if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') {
// Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again
// continuous failures will eventually converge to currentLimit == minimumLimit and then throw
currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25));
continue;
}
throw e;
}
if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) {
// If no notes were found or the cursor did not change, we can stop.
job.log('No more notes to clean. (no initiator notes found or cursor did not change.)');
if (noteIds.length === 0) {
job.log('No more notes to clean.');
break;
}
const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({
initiatorIds: initiatorNotes.map(note => note.id),
newestLimit,
}).getRawMany();
const queryDuration = performance.now() - queryBegin;
// try to adjust such that each query takes about 1~5 seconds and reasonable NodeJS heap so the task stays responsive
// this should not oscillate..
if (queryDuration > 5000 || noteIds.length > 5000) {
currentLimit = Math.floor(currentLimit * 0.5);
} else if (queryDuration < 1000 && noteIds.length < 1000) {
currentLimit = Math.floor(currentLimit * 1.5);
}
// clamp to a sane range
currentLimit = Math.min(Math.max(currentLimit, minimumLimit), 5000);
cursor = newCursor;
const deletableNoteIds = noteIds.filter(result => result.isRemovable).map(result => result.id);
if (deletableNoteIds.length > 0) {
try {
await this.notesRepository.delete(deletableNoteIds);
if (notes.length > 0) {
await this.notesRepository.delete(notes.map(note => note.id));
for (const { id } of notes) {
const t = this.idService.parse(id).date.getTime();
if (stats.oldest === null || t < stats.oldest) {
stats.oldest = t;
for (const id of deletableNoteIds) {
const t = this.idService.parse(id).date.getTime();
if (stats.oldest === null || t < stats.oldest) {
stats.oldest = t;
}
if (stats.newest === null || t > stats.newest) {
stats.newest = t;
}
}
if (stats.newest === null || t > stats.newest) {
stats.newest = t;
stats.deletedCount += deletableNoteIds.length;
} catch (e) {
// check for integrity violation errors (class 23) that might have occurred between the check and the delete
// we can safely continue to the next batch
if (e instanceof QueryFailedError && e.driverError?.code?.startsWith('23')) {
transientErrors++;
job.log(`Error deleting notes: ${e} (transient race condition?)`);
} else {
throw e;
}
}
stats.deletedCount += notes.length;
}
job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`);
cursorLeft = noteIds.filter(result => result.isBase).reduce((max, { id }) => id > max ? id : max, cursorLeft);
if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) {
// If we fetched less than the maximum, it means there are no more notes to process.
job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`);
break;
job.log(`Deleted ${noteIds.length} notes; ${Date.now() - batchBeginAt}ms`);
if (process.env.NODE_ENV !== 'test') {
await setTimeout(Math.min(1000 * 5, queryDuration)); // Wait a moment to avoid overwhelming the db
}
};
await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db
if (transientErrors > 0) {
const msg = `${transientErrors} transient errors occurred while cleaning remote notes. You may need a second pass to complete the cleaning.`;
this.logger.warn(msg);
job.log(msg);
}
this.logger.succ('cleaning of remote notes completed.');
return {
@ -198,6 +281,7 @@ export class CleanRemoteNotesProcessorService {
oldest: stats.oldest,
newest: stats.newest,
skipped: false,
transientErrors,
};
}
}

View File

@ -158,6 +158,7 @@ describe('CleanRemoteNotesProcessorService', () => {
oldest: null,
newest: null,
skipped: true,
transientErrors: 0,
});
});
@ -172,6 +173,7 @@ describe('CleanRemoteNotesProcessorService', () => {
oldest: null,
newest: null,
skipped: false,
transientErrors: 0,
});
}, 3000);
@ -199,6 +201,7 @@ describe('CleanRemoteNotesProcessorService', () => {
oldest: expect.any(Number),
newest: expect.any(Number),
skipped: false,
transientErrors: 0,
});
// Check side-by-side from all notes