290 lines
10 KiB
TypeScript
290 lines
10 KiB
TypeScript
/*
|
|
* SPDX-FileCopyrightText: syuilo and misskey-project
|
|
* SPDX-License-Identifier: AGPL-3.0-only
|
|
*/
|
|
|
|
import { setTimeout } from 'node:timers/promises';
|
|
import { Inject, Injectable } from '@nestjs/common';
|
|
import { DataSource, IsNull, LessThan, QueryFailedError, Not } from 'typeorm';
|
|
import { DI } from '@/di-symbols.js';
|
|
import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js';
|
|
import type Logger from '@/logger.js';
|
|
import { bindThis } from '@/decorators.js';
|
|
import { IdService } from '@/core/IdService.js';
|
|
import { QueueLoggerService } from '../QueueLoggerService.js';
|
|
import type * as Bull from 'bullmq';
|
|
|
|
@Injectable()
|
|
export class CleanRemoteNotesProcessorService {
|
|
private logger: Logger;
|
|
|
|
constructor(
|
|
@Inject(DI.meta)
|
|
private meta: MiMeta,
|
|
|
|
@Inject(DI.notesRepository)
|
|
private notesRepository: NotesRepository,
|
|
|
|
@Inject(DI.db)
|
|
private db: DataSource,
|
|
|
|
private idService: IdService,
|
|
private queueLoggerService: QueueLoggerService,
|
|
) {
|
|
this.logger = this.queueLoggerService.logger.createSubLogger('clean-remote-notes');
|
|
}
|
|
|
|
@bindThis
|
|
private computeProgress(minId: string, maxId: string, cursorLeft: string) {
|
|
const minTs = this.idService.parse(minId).date.getTime();
|
|
const maxTs = this.idService.parse(maxId).date.getTime();
|
|
const cursorTs = this.idService.parse(cursorLeft).date.getTime();
|
|
|
|
return ((cursorTs - minTs) / (maxTs - minTs)) * 100;
|
|
}
|
|
|
|
@bindThis
|
|
public async process(job: Bull.Job<Record<string, unknown>>): Promise<{
|
|
deletedCount: number;
|
|
oldest: number | null;
|
|
newest: number | null;
|
|
skipped: boolean;
|
|
transientErrors: number;
|
|
}> {
|
|
if (!this.meta.enableRemoteNotesCleaning) {
|
|
this.logger.info('Remote notes cleaning is disabled, skipping...');
|
|
return {
|
|
deletedCount: 0,
|
|
oldest: null,
|
|
newest: null,
|
|
skipped: true,
|
|
transientErrors: 0,
|
|
};
|
|
}
|
|
|
|
this.logger.info('cleaning remote notes...');
|
|
|
|
const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds
|
|
const startAt = Date.now();
|
|
|
|
//#region queries
|
|
// The date limit for the newest note to be considered for deletion.
|
|
// All notes newer than this limit will always be retained.
|
|
const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
|
|
|
|
// The condition for removing the notes.
|
|
// The note must be:
|
|
// - old enough (older than the newestLimit)
|
|
// - a remote note (userHost is not null).
|
|
// - not have clipped
|
|
// - not have pinned on the user profile
|
|
// - not has been favorite by any user
|
|
const removalCriteria = [
|
|
'note."id" < :newestLimit',
|
|
'note."clippedCount" = 0',
|
|
'note."pageCount" = 0',
|
|
'note."userHost" IS NOT NULL',
|
|
'NOT EXISTS (SELECT 1 FROM user_note_pining WHERE "noteId" = note."id")',
|
|
'NOT EXISTS (SELECT 1 FROM note_favorite WHERE "noteId" = note."id")',
|
|
'NOT EXISTS (SELECT 1 FROM note_reaction INNER JOIN "user" ON note_reaction."userId" = "user".id WHERE note_reaction."noteId" = note."id" AND "user"."host" IS NULL)',
|
|
].join(' AND ');
|
|
|
|
const minId = (await this.notesRepository.createQueryBuilder('note')
|
|
.select('MIN(note.id)', 'minId')
|
|
.where({
|
|
id: LessThan(newestLimit),
|
|
userHost: Not(IsNull()),
|
|
replyId: IsNull(),
|
|
renoteId: IsNull(),
|
|
})
|
|
.getRawOne<{ minId?: MiNote['id'] }>())?.minId;
|
|
|
|
if (!minId) {
|
|
this.logger.info('No notes can possibly be deleted, skipping...');
|
|
return {
|
|
deletedCount: 0,
|
|
oldest: null,
|
|
newest: null,
|
|
skipped: false,
|
|
transientErrors: 0,
|
|
};
|
|
}
|
|
|
|
// start with a conservative limit and adjust it based on the query duration
|
|
const minimumLimit = 10;
|
|
let currentLimit = 100;
|
|
let cursorLeft = '0';
|
|
|
|
const candidateNotesCteName = 'candidate_notes';
|
|
|
|
// tree walk down all root notes, short-circuit when the first unremovable note is found
|
|
const candidateNotesQueryBase = this.notesRepository.createQueryBuilder('note')
|
|
.select('note."id"', 'id')
|
|
.addSelect('note."replyId"', 'replyId')
|
|
.addSelect('note."renoteId"', 'renoteId')
|
|
.addSelect('note."id"', 'rootId')
|
|
.addSelect('TRUE', 'isRemovable')
|
|
.addSelect('TRUE', 'isBase')
|
|
.where('note."id" > :cursorLeft')
|
|
.andWhere(removalCriteria)
|
|
.andWhere({ replyId: IsNull(), renoteId: IsNull() });
|
|
|
|
const candidateNotesQueryInductive = this.notesRepository.createQueryBuilder('note')
|
|
.select('note.id', 'id')
|
|
.addSelect('note."replyId"', 'replyId')
|
|
.addSelect('note."renoteId"', 'renoteId')
|
|
.addSelect('parent."rootId"', 'rootId')
|
|
.addSelect(removalCriteria, 'isRemovable')
|
|
.addSelect('FALSE', 'isBase')
|
|
.innerJoin(candidateNotesCteName, 'parent', 'parent."id" = note."replyId" OR parent."id" = note."renoteId"')
|
|
.where('parent."isRemovable" = TRUE');
|
|
|
|
// A note tree can be deleted if there are no unremovable rows with the same rootId.
|
|
//
|
|
// `candidate_notes` will have the following structure after recursive query (some columns omitted):
|
|
// After performing a LEFT JOIN with `candidate_notes` as `unremovable`,
|
|
// the note tree containing unremovable notes will be anti-joined.
|
|
// For removable rows, the `unremovable` columns will have `NULL` values.
|
|
// | id | rootId | isRemovable |
|
|
// |-----|--------|-------------|
|
|
// | aaa | aaa | TRUE |
|
|
// | bbb | aaa | FALSE |
|
|
// | ccc | aaa | FALSE |
|
|
// | ddd | ddd | TRUE |
|
|
// | eee | ddd | TRUE |
|
|
// | fff | fff | TRUE |
|
|
// | ggg | ggg | FALSE |
|
|
//
|
|
const candidateNotesQuery = this.db.createQueryBuilder()
|
|
.select(`"${candidateNotesCteName}"."id"`, 'id')
|
|
.addSelect('unremovable."id" IS NULL', 'isRemovable')
|
|
.addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase')
|
|
.addCommonTableExpression(
|
|
`((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
|
|
candidateNotesCteName,
|
|
{ recursive: true },
|
|
)
|
|
.from(candidateNotesCteName, candidateNotesCteName)
|
|
.leftJoin(candidateNotesCteName, 'unremovable', `unremovable."rootId" = "${candidateNotesCteName}"."rootId" AND unremovable."isRemovable" = FALSE`)
|
|
.groupBy(`"${candidateNotesCteName}"."id"`)
|
|
.addGroupBy('unremovable."id" IS NULL');
|
|
|
|
const stats = {
|
|
deletedCount: 0,
|
|
oldest: null as number | null,
|
|
newest: null as number | null,
|
|
};
|
|
|
|
let lowThroughputWarned = false;
|
|
let transientErrors = 0;
|
|
for (;;) {
|
|
//#region check time
|
|
const batchBeginAt = Date.now();
|
|
|
|
const elapsed = batchBeginAt - startAt;
|
|
|
|
const progress = this.computeProgress(minId, newestLimit, cursorLeft > minId ? cursorLeft : minId);
|
|
|
|
if (elapsed >= maxDuration) {
|
|
job.log(`Reached maximum duration of ${maxDuration}ms, stopping... (last cursor: ${cursorLeft}, final progress ${progress}%)`);
|
|
job.updateProgress(100);
|
|
break;
|
|
}
|
|
|
|
const wallClockUsage = elapsed / maxDuration;
|
|
if (wallClockUsage > 0.5 && progress < 50 && !lowThroughputWarned) {
|
|
const msg = `Not projected to finish in time! (wall clock usage ${wallClockUsage * 100}% at ${progress}%, current limit ${currentLimit})`;
|
|
this.logger.warn(msg);
|
|
job.log(msg);
|
|
lowThroughputWarned = true;
|
|
}
|
|
job.updateProgress(progress);
|
|
//#endregion
|
|
|
|
const queryBegin = performance.now();
|
|
let noteIds = null;
|
|
|
|
try {
|
|
noteIds = await candidateNotesQuery.setParameters(
|
|
{ newestLimit, cursorLeft },
|
|
).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>();
|
|
} catch (e) {
|
|
if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') {
|
|
// Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again
|
|
// continuous failures will eventually converge to currentLimit == minimumLimit and then throw
|
|
currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25));
|
|
continue;
|
|
}
|
|
throw e;
|
|
}
|
|
|
|
if (noteIds.length === 0) {
|
|
job.log('No more notes to clean.');
|
|
break;
|
|
}
|
|
|
|
const queryDuration = performance.now() - queryBegin;
|
|
// try to adjust such that each query takes about 1~5 seconds and reasonable NodeJS heap so the task stays responsive
|
|
// this should not oscillate..
|
|
if (queryDuration > 5000 || noteIds.length > 5000) {
|
|
currentLimit = Math.floor(currentLimit * 0.5);
|
|
} else if (queryDuration < 1000 && noteIds.length < 1000) {
|
|
currentLimit = Math.floor(currentLimit * 1.5);
|
|
}
|
|
// clamp to a sane range
|
|
currentLimit = Math.min(Math.max(currentLimit, minimumLimit), 5000);
|
|
|
|
const deletableNoteIds = noteIds.filter(result => result.isRemovable).map(result => result.id);
|
|
if (deletableNoteIds.length > 0) {
|
|
try {
|
|
await this.notesRepository.delete(deletableNoteIds);
|
|
|
|
for (const id of deletableNoteIds) {
|
|
const t = this.idService.parse(id).date.getTime();
|
|
if (stats.oldest === null || t < stats.oldest) {
|
|
stats.oldest = t;
|
|
}
|
|
if (stats.newest === null || t > stats.newest) {
|
|
stats.newest = t;
|
|
}
|
|
}
|
|
|
|
stats.deletedCount += deletableNoteIds.length;
|
|
} catch (e) {
|
|
// check for integrity violation errors (class 23) that might have occurred between the check and the delete
|
|
// we can safely continue to the next batch
|
|
if (e instanceof QueryFailedError && e.driverError?.code?.startsWith('23')) {
|
|
transientErrors++;
|
|
job.log(`Error deleting notes: ${e} (transient race condition?)`);
|
|
} else {
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
cursorLeft = noteIds.filter(result => result.isBase).reduce((max, { id }) => id > max ? id : max, cursorLeft);
|
|
|
|
job.log(`Deleted ${noteIds.length} notes; ${Date.now() - batchBeginAt}ms`);
|
|
|
|
if (process.env.NODE_ENV !== 'test') {
|
|
await setTimeout(Math.min(1000 * 5, queryDuration)); // Wait a moment to avoid overwhelming the db
|
|
}
|
|
};
|
|
|
|
if (transientErrors > 0) {
|
|
const msg = `${transientErrors} transient errors occurred while cleaning remote notes. You may need a second pass to complete the cleaning.`;
|
|
this.logger.warn(msg);
|
|
job.log(msg);
|
|
}
|
|
this.logger.succ('cleaning of remote notes completed.');
|
|
|
|
return {
|
|
deletedCount: stats.deletedCount,
|
|
oldest: stats.oldest,
|
|
newest: stats.newest,
|
|
skipped: false,
|
|
transientErrors,
|
|
};
|
|
}
|
|
}
|