enhance: Remote Notes Cleaning timeout bailout logic (#16752)
* enhance: Remote Notes Cleaning timeout bailout logic Signed-off-by: eternal-flame-AD <yume@yumechi.jp> * fix: dynamic limit did not propagate Signed-off-by: eternal-flame-AD <yume@yumechi.jp> * enhance: reload parameters each batch Signed-off-by: eternal-flame-AD <yume@yumechi.jp> --------- Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
This commit is contained in:
parent
290fd8c7cc
commit
37a21cf54e
|
|
@ -12,8 +12,7 @@
|
|||
- Fix: ページのタイトルが長いとき、はみ出る問題を修正
|
||||
|
||||
### Server
|
||||
-
|
||||
|
||||
- Enhance: Remote Notes Cleaningが複雑度が高いノートの処理を中断せずに次のノートから再開するように
|
||||
|
||||
## 2025.10.2
|
||||
|
||||
|
|
|
|||
|
|
@ -51,6 +51,17 @@ export class CleanRemoteNotesProcessorService {
|
|||
skipped: boolean;
|
||||
transientErrors: number;
|
||||
}> {
|
||||
const getConfig = () => {
|
||||
return {
|
||||
enabled: this.meta.enableRemoteNotesCleaning,
|
||||
maxDuration: this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000, // Convert minutes to milliseconds
|
||||
// The date limit for the newest note to be considered for deletion.
|
||||
// All notes newer than this limit will always be retained.
|
||||
newestLimit: this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes)),
|
||||
};
|
||||
};
|
||||
|
||||
const initialConfig = getConfig();
|
||||
if (!this.meta.enableRemoteNotesCleaning) {
|
||||
this.logger.info('Remote notes cleaning is disabled, skipping...');
|
||||
return {
|
||||
|
|
@ -64,13 +75,9 @@ export class CleanRemoteNotesProcessorService {
|
|||
|
||||
this.logger.info('cleaning remote notes...');
|
||||
|
||||
const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds
|
||||
const startAt = Date.now();
|
||||
|
||||
//#region queries
|
||||
// The date limit for the newest note to be considered for deletion.
|
||||
// All notes newer than this limit will always be retained.
|
||||
const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
|
||||
|
||||
// The condition for removing the notes.
|
||||
// The note must be:
|
||||
|
|
@ -92,7 +99,7 @@ export class CleanRemoteNotesProcessorService {
|
|||
const minId = (await this.notesRepository.createQueryBuilder('note')
|
||||
.select('MIN(note.id)', 'minId')
|
||||
.where({
|
||||
id: LessThan(newestLimit),
|
||||
id: LessThan(initialConfig.newestLimit),
|
||||
userHost: Not(IsNull()),
|
||||
replyId: IsNull(),
|
||||
renoteId: IsNull(),
|
||||
|
|
@ -155,12 +162,12 @@ export class CleanRemoteNotesProcessorService {
|
|||
// | fff | fff | TRUE |
|
||||
// | ggg | ggg | FALSE |
|
||||
//
|
||||
const candidateNotesQuery = this.db.createQueryBuilder()
|
||||
const candidateNotesQuery = ({ limit }: { limit: number }) => this.db.createQueryBuilder()
|
||||
.select(`"${candidateNotesCteName}"."id"`, 'id')
|
||||
.addSelect('unremovable."id" IS NULL', 'isRemovable')
|
||||
.addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase')
|
||||
.addCommonTableExpression(
|
||||
`((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
|
||||
`((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(limit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
|
||||
candidateNotesCteName,
|
||||
{ recursive: true },
|
||||
)
|
||||
|
|
@ -178,6 +185,11 @@ export class CleanRemoteNotesProcessorService {
|
|||
let lowThroughputWarned = false;
|
||||
let transientErrors = 0;
|
||||
for (;;) {
|
||||
const { enabled, maxDuration, newestLimit } = getConfig();
|
||||
if (!enabled) {
|
||||
this.logger.info('Remote notes cleaning is disabled, processing stopped...');
|
||||
break;
|
||||
}
|
||||
//#region check time
|
||||
const batchBeginAt = Date.now();
|
||||
|
||||
|
|
@ -205,13 +217,38 @@ export class CleanRemoteNotesProcessorService {
|
|||
let noteIds = null;
|
||||
|
||||
try {
|
||||
noteIds = await candidateNotesQuery.setParameters(
|
||||
noteIds = await candidateNotesQuery({ limit: currentLimit }).setParameters(
|
||||
{ newestLimit, cursorLeft },
|
||||
).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>();
|
||||
} catch (e) {
|
||||
if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') {
|
||||
// Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again
|
||||
// continuous failures will eventually converge to currentLimit == minimumLimit and then throw
|
||||
if (e instanceof QueryFailedError && e.driverError?.code === '57014') {
|
||||
// Statement timeout (maybe suddenly hit a large note tree), if possible, reduce the limit and try again
|
||||
// if not possible, skip the current batch of notes and find the next root note
|
||||
if (currentLimit <= minimumLimit) {
|
||||
job.log('Local note tree complexity is too high, finding next root note...');
|
||||
|
||||
const idWindow = await this.notesRepository.createQueryBuilder('note')
|
||||
.select('id')
|
||||
.where('note.id > :cursorLeft')
|
||||
.andWhere(removalCriteria)
|
||||
.andWhere({ replyId: IsNull(), renoteId: IsNull() })
|
||||
.orderBy('note.id', 'ASC')
|
||||
.limit(minimumLimit + 1)
|
||||
.setParameters({ cursorLeft, newestLimit })
|
||||
.getRawMany<{ id?: MiNote['id'] }>();
|
||||
|
||||
job.log(`Skipped note IDs: ${idWindow.slice(0, minimumLimit).map(id => id.id).join(', ')}`);
|
||||
|
||||
const lastId = idWindow.at(minimumLimit)?.id;
|
||||
|
||||
if (!lastId) {
|
||||
job.log('No more notes to clean.');
|
||||
break;
|
||||
}
|
||||
|
||||
cursorLeft = lastId;
|
||||
continue;
|
||||
}
|
||||
currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25));
|
||||
continue;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue