misskey/packages/backend/src/core/FanoutTimelineEndpointServi...

171 lines
6.0 KiB
TypeScript
Raw Normal View History

/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Inject, Injectable } from '@nestjs/common';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { MiUser } from '@/models/User.js';
import type { MiNote } from '@/models/Note.js';
import { Packed } from '@/misc/json-schema.js';
import type { NotesRepository } from '@/models/_.js';
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { FanoutTimelineName, FanoutTimelineService } from '@/core/FanoutTimelineService.js';
import { isUserRelated } from '@/misc/is-user-related.js';
import { isPureRenote } from '@/misc/is-pure-renote.js';
import { CacheService } from '@/core/CacheService.js';
import { isReply } from '@/misc/is-reply.js';
import { isInstanceMuted } from '@/misc/is-instance-muted.js';
type TimelineOptions = {
untilId: string | null,
sinceId: string | null,
limit: number,
allowPartial: boolean,
me?: { id: MiUser['id'] } | undefined | null,
useDbFallback: boolean,
redisTimelines: FanoutTimelineName[],
noteFilter?: (note: MiNote) => boolean,
alwaysIncludeMyNotes?: boolean;
ignoreAuthorFromMute?: boolean;
excludeNoFiles?: boolean;
excludeReplies?: boolean;
excludePureRenotes: boolean;
dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>,
};
@Injectable()
export class FanoutTimelineEndpointService {
constructor(
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,
private noteEntityService: NoteEntityService,
private cacheService: CacheService,
private fanoutTimelineService: FanoutTimelineService,
) {
}
@bindThis
async timeline(ps: TimelineOptions): Promise<Packed<'Note'>[]> {
return await this.noteEntityService.packMany(await this.getMiNotes(ps), ps.me);
}
@bindThis
private async getMiNotes(ps: TimelineOptions): Promise<MiNote[]> {
let noteIds: string[];
let shouldFallbackToDb = false;
// 呼び出し元と以下の処理をシンプルにするためにdbFallbackを置き換える
if (!ps.useDbFallback) ps.dbFallback = () => Promise.resolve([]);
const redisResult = await this.fanoutTimelineService.getMulti(ps.redisTimelines, ps.untilId, ps.sinceId);
const redisResultIds = Array.from(new Set(redisResult.flat(1)));
redisResultIds.sort((a, b) => a > b ? -1 : 1);
noteIds = redisResultIds.slice(0, ps.limit);
shouldFallbackToDb = shouldFallbackToDb || (noteIds.length === 0);
if (!shouldFallbackToDb) {
let filter = ps.noteFilter ?? (_note => true);
if (ps.alwaysIncludeMyNotes && ps.me) {
const me = ps.me;
const parentFilter = filter;
filter = (note) => note.userId === me.id || parentFilter(note);
}
if (ps.excludeNoFiles) {
const parentFilter = filter;
filter = (note) => note.fileIds.length !== 0 && parentFilter(note);
}
if (ps.excludeReplies) {
const parentFilter = filter;
filter = (note) => !isReply(note, ps.me?.id) && parentFilter(note);
}
if (ps.excludePureRenotes) {
const parentFilter = filter;
filter = (note) => !isPureRenote(note) && parentFilter(note);
}
if (ps.me) {
const me = ps.me;
const [
userIdsWhoMeMuting,
userIdsWhoMeMutingRenotes,
userIdsWhoBlockingMe,
userMutedInstances,
] = await Promise.all([
this.cacheService.userMutingsCache.fetch(ps.me.id),
this.cacheService.renoteMutingsCache.fetch(ps.me.id),
this.cacheService.userBlockedCache.fetch(ps.me.id),
this.cacheService.userProfileCache.fetch(me.id).then(p => new Set(p.mutedInstances)),
]);
const parentFilter = filter;
filter = (note) => {
if (isUserRelated(note, userIdsWhoBlockingMe, ps.ignoreAuthorFromMute)) return false;
if (isUserRelated(note, userIdsWhoMeMuting, ps.ignoreAuthorFromMute)) return false;
if (isPureRenote(note) && isUserRelated(note, userIdsWhoMeMutingRenotes, ps.ignoreAuthorFromMute)) return false;
if (isInstanceMuted(note, userMutedInstances)) return false;
return parentFilter(note);
};
}
const redisTimeline: MiNote[] = [];
let readFromRedis = 0;
let lastSuccessfulRate = 1; // rateをキャッシュする
while ((redisResultIds.length - readFromRedis) !== 0) {
const remainingToRead = ps.limit - redisTimeline.length;
// DBからの取り直しを減らす初回と同じ割合以上で成功すると仮定するが、クエリの長さを考えて三倍まで
const countToGet = remainingToRead * Math.ceil(Math.min(1.1 / lastSuccessfulRate, 3));
noteIds = redisResultIds.slice(readFromRedis, readFromRedis + countToGet);
readFromRedis += noteIds.length;
const gotFromDb = await this.getAndFilterFromDb(noteIds, filter);
redisTimeline.push(...gotFromDb);
lastSuccessfulRate = gotFromDb.length / noteIds.length;
if (ps.allowPartial ? redisTimeline.length !== 0 : redisTimeline.length >= ps.limit) {
// 十分Redisからとれた
return redisTimeline.slice(0, ps.limit);
}
}
// まだ足りない分はDBにフォールバック
const remainingToRead = ps.limit - redisTimeline.length;
const gotFromDb = await ps.dbFallback(noteIds[noteIds.length - 1], ps.sinceId, remainingToRead);
redisTimeline.push(...gotFromDb);
return redisTimeline;
}
return await ps.dbFallback(ps.untilId, ps.sinceId, ps.limit);
}
private async getAndFilterFromDb(noteIds: string[], noteFilter: (note: MiNote) => boolean): Promise<MiNote[]> {
const query = this.notesRepository.createQueryBuilder('note')
.where('note.id IN (:...noteIds)', { noteIds: noteIds })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser')
.leftJoinAndSelect('note.channel', 'channel');
const notes = (await query.getMany()).filter(noteFilter);
notes.sort((a, b) => a.id > b.id ? -1 : 1);
return notes;
}
}